diff --git a/server/src/domain/library/library.service.ts b/server/src/domain/library/library.service.ts index 6a5982abef..eb80a94886 100644 --- a/server/src/domain/library/library.service.ts +++ b/server/src/domain/library/library.service.ts @@ -13,9 +13,11 @@ import { usePagination, validateCronExpression } from '../domain.util'; import { IBaseJob, IEntityJob, ILibraryFileJob, ILibraryRefreshJob, JOBS_ASSET_PAGINATION_SIZE, JobName } from '../job'; import { + DatabaseLock, IAccessRepository, IAssetRepository, ICryptoRepository, + IDatabaseRepository, IJobRepository, ILibraryRepository, IStorageRepository, @@ -53,6 +55,7 @@ export class LibraryService extends EventEmitter { @Inject(ILibraryRepository) private repository: ILibraryRepository, @Inject(IStorageRepository) private storageRepository: IStorageRepository, @Inject(IUserRepository) private userRepository: IUserRepository, + @Inject(IDatabaseRepository) private databaseRepository: IDatabaseRepository, ) { super(); this.access = AccessCore.create(accessRepository); @@ -103,59 +106,61 @@ export class LibraryService extends EventEmitter { return false; } - await this.unwatch(id); + this.databaseRepository.withTryLock(DatabaseLock.LibraryWatch, async () => { + await this.unwatch(id); - this.logger.log(`Starting to watch library ${library.id} with import path(s) ${library.importPaths}`); + this.logger.log(`Starting to watch library ${library.id} with import path(s) ${library.importPaths}`); - const matcher = picomatch(`**/*{${mimeTypes.getSupportedFileExtensions().join(',')}}`, { - nocase: true, - ignore: library.exclusionPatterns, + const matcher = picomatch(`**/*{${mimeTypes.getSupportedFileExtensions().join(',')}}`, { + nocase: true, + ignore: library.exclusionPatterns, + }); + + let _resolve: () => void; + const ready$ = new Promise((resolve) => (_resolve = resolve)); + + this.watchers[id] = this.storageRepository.watch( + library.importPaths, + { + usePolling: false, + ignoreInitial: true, + }, + { + onReady: () => _resolve(), + onAdd: async (path) => { + this.logger.debug(`File add event received for ${path} in library ${library.id}}`); + if (matcher(path)) { + await this.scanAssets(library.id, [path], library.ownerId, false); + } + this.emit('add', path); + }, + onChange: async (path) => { + this.logger.debug(`Detected file change for ${path} in library ${library.id}`); + if (matcher(path)) { + // Note: if the changed file was not previously imported, it will be imported now. + await this.scanAssets(library.id, [path], library.ownerId, false); + } + this.emit('change', path); + }, + onUnlink: async (path) => { + this.logger.debug(`Detected deleted file at ${path} in library ${library.id}`); + const asset = await this.assetRepository.getByLibraryIdAndOriginalPath(library.id, path); + if (asset && matcher(path)) { + await this.assetRepository.save({ id: asset.id, isOffline: true }); + } + this.emit('unlink', path); + }, + onError: (error) => { + // TODO: should we log, or throw an exception? + this.logger.error(`Library watcher for library ${library.id} encountered error: ${error}`); + }, + }, + ); + + // Wait for the watcher to initialize before returning + await ready$; }); - let _resolve: () => void; - const ready$ = new Promise((resolve) => (_resolve = resolve)); - - this.watchers[id] = this.storageRepository.watch( - library.importPaths, - { - usePolling: false, - ignoreInitial: true, - }, - { - onReady: () => _resolve(), - onAdd: async (path) => { - this.logger.debug(`File add event received for ${path} in library ${library.id}}`); - if (matcher(path)) { - await this.scanAssets(library.id, [path], library.ownerId, false); - } - this.emit('add', path); - }, - onChange: async (path) => { - this.logger.debug(`Detected file change for ${path} in library ${library.id}`); - if (matcher(path)) { - // Note: if the changed file was not previously imported, it will be imported now. - await this.scanAssets(library.id, [path], library.ownerId, false); - } - this.emit('change', path); - }, - onUnlink: async (path) => { - this.logger.debug(`Detected deleted file at ${path} in library ${library.id}`); - const asset = await this.assetRepository.getByLibraryIdAndOriginalPath(library.id, path); - if (asset && matcher(path)) { - await this.assetRepository.save({ id: asset.id, isOffline: true }); - } - this.emit('unlink', path); - }, - onError: (error) => { - // TODO: should we log, or throw an exception? - this.logger.error(`Library watcher for library ${library.id} encountered error: ${error}`); - }, - }, - ); - - // Wait for the watcher to initialize before returning - await ready$; - return true; } diff --git a/server/src/domain/repositories/database.repository.ts b/server/src/domain/repositories/database.repository.ts index d32939fe61..0b4317e37c 100644 --- a/server/src/domain/repositories/database.repository.ts +++ b/server/src/domain/repositories/database.repository.ts @@ -19,6 +19,7 @@ export enum DatabaseLock { Migrations = 200, StorageTemplateMigration = 420, CLIPDimSize = 512, + LibraryWatch = 1337, } export const extName: Record = { @@ -46,6 +47,7 @@ export interface IDatabaseRepository { shouldReindex(name: VectorIndex): Promise; runMigrations(options?: { transaction?: 'all' | 'none' | 'each' }): Promise; withLock(lock: DatabaseLock, callback: () => Promise): Promise; + withTryLock(lock: DatabaseLock, callback: () => Promise): Promise; isBusy(lock: DatabaseLock): boolean; wait(lock: DatabaseLock): Promise; } diff --git a/server/src/infra/repositories/database.repository.ts b/server/src/infra/repositories/database.repository.ts index b0e4623af5..d720f24747 100644 --- a/server/src/infra/repositories/database.repository.ts +++ b/server/src/infra/repositories/database.repository.ts @@ -210,6 +210,25 @@ export class DatabaseRepository implements IDatabaseRepository { return res as R; } + async withTryLock(lock: DatabaseLock, callback: () => Promise): Promise { + let res; + const queryRunner = this.dataSource.createQueryRunner(); + try { + const lockAcquired = await this.tryLock(lock, queryRunner); + if (lockAcquired) { + res = await callback(); + } + } finally { + try { + await this.releaseLock(lock, queryRunner); + } finally { + await queryRunner.release(); + } + } + + return res as R; + } + isBusy(lock: DatabaseLock): boolean { return this.asyncLock.isBusy(DatabaseLock[lock]); } @@ -222,6 +241,10 @@ export class DatabaseRepository implements IDatabaseRepository { return queryRunner.query('SELECT pg_advisory_lock($1)', [lock]); } + private async tryLock(lock: DatabaseLock, queryRunner: QueryRunner): Promise { + return queryRunner.query('SELECT pg_try_advisory_lock($1)', [lock]); + } + private async releaseLock(lock: DatabaseLock, queryRunner: QueryRunner): Promise { return queryRunner.query('SELECT pg_advisory_unlock($1)', [lock]); }