add try lock

This commit is contained in:
Jonathan Jogenfors
2024-02-29 18:45:28 +01:00
parent 19c4a8bf33
commit c07ef59167
3 changed files with 79 additions and 49 deletions

View File

@@ -13,9 +13,11 @@ import { usePagination, validateCronExpression } from '../domain.util';
import { IBaseJob, IEntityJob, ILibraryFileJob, ILibraryRefreshJob, JOBS_ASSET_PAGINATION_SIZE, JobName } from '../job';
import {
DatabaseLock,
IAccessRepository,
IAssetRepository,
ICryptoRepository,
IDatabaseRepository,
IJobRepository,
ILibraryRepository,
IStorageRepository,
@@ -53,6 +55,7 @@ export class LibraryService extends EventEmitter {
@Inject(ILibraryRepository) private repository: ILibraryRepository,
@Inject(IStorageRepository) private storageRepository: IStorageRepository,
@Inject(IUserRepository) private userRepository: IUserRepository,
@Inject(IDatabaseRepository) private databaseRepository: IDatabaseRepository,
) {
super();
this.access = AccessCore.create(accessRepository);
@@ -103,59 +106,61 @@ export class LibraryService extends EventEmitter {
return false;
}
await this.unwatch(id);
this.databaseRepository.withTryLock(DatabaseLock.LibraryWatch, async () => {
await this.unwatch(id);
this.logger.log(`Starting to watch library ${library.id} with import path(s) ${library.importPaths}`);
this.logger.log(`Starting to watch library ${library.id} with import path(s) ${library.importPaths}`);
const matcher = picomatch(`**/*{${mimeTypes.getSupportedFileExtensions().join(',')}}`, {
nocase: true,
ignore: library.exclusionPatterns,
const matcher = picomatch(`**/*{${mimeTypes.getSupportedFileExtensions().join(',')}}`, {
nocase: true,
ignore: library.exclusionPatterns,
});
let _resolve: () => void;
const ready$ = new Promise<void>((resolve) => (_resolve = resolve));
this.watchers[id] = this.storageRepository.watch(
library.importPaths,
{
usePolling: false,
ignoreInitial: true,
},
{
onReady: () => _resolve(),
onAdd: async (path) => {
this.logger.debug(`File add event received for ${path} in library ${library.id}}`);
if (matcher(path)) {
await this.scanAssets(library.id, [path], library.ownerId, false);
}
this.emit('add', path);
},
onChange: async (path) => {
this.logger.debug(`Detected file change for ${path} in library ${library.id}`);
if (matcher(path)) {
// Note: if the changed file was not previously imported, it will be imported now.
await this.scanAssets(library.id, [path], library.ownerId, false);
}
this.emit('change', path);
},
onUnlink: async (path) => {
this.logger.debug(`Detected deleted file at ${path} in library ${library.id}`);
const asset = await this.assetRepository.getByLibraryIdAndOriginalPath(library.id, path);
if (asset && matcher(path)) {
await this.assetRepository.save({ id: asset.id, isOffline: true });
}
this.emit('unlink', path);
},
onError: (error) => {
// TODO: should we log, or throw an exception?
this.logger.error(`Library watcher for library ${library.id} encountered error: ${error}`);
},
},
);
// Wait for the watcher to initialize before returning
await ready$;
});
let _resolve: () => void;
const ready$ = new Promise<void>((resolve) => (_resolve = resolve));
this.watchers[id] = this.storageRepository.watch(
library.importPaths,
{
usePolling: false,
ignoreInitial: true,
},
{
onReady: () => _resolve(),
onAdd: async (path) => {
this.logger.debug(`File add event received for ${path} in library ${library.id}}`);
if (matcher(path)) {
await this.scanAssets(library.id, [path], library.ownerId, false);
}
this.emit('add', path);
},
onChange: async (path) => {
this.logger.debug(`Detected file change for ${path} in library ${library.id}`);
if (matcher(path)) {
// Note: if the changed file was not previously imported, it will be imported now.
await this.scanAssets(library.id, [path], library.ownerId, false);
}
this.emit('change', path);
},
onUnlink: async (path) => {
this.logger.debug(`Detected deleted file at ${path} in library ${library.id}`);
const asset = await this.assetRepository.getByLibraryIdAndOriginalPath(library.id, path);
if (asset && matcher(path)) {
await this.assetRepository.save({ id: asset.id, isOffline: true });
}
this.emit('unlink', path);
},
onError: (error) => {
// TODO: should we log, or throw an exception?
this.logger.error(`Library watcher for library ${library.id} encountered error: ${error}`);
},
},
);
// Wait for the watcher to initialize before returning
await ready$;
return true;
}

View File

@@ -19,6 +19,7 @@ export enum DatabaseLock {
Migrations = 200,
StorageTemplateMigration = 420,
CLIPDimSize = 512,
LibraryWatch = 1337,
}
export const extName: Record<DatabaseExtension, string> = {
@@ -46,6 +47,7 @@ export interface IDatabaseRepository {
shouldReindex(name: VectorIndex): Promise<boolean>;
runMigrations(options?: { transaction?: 'all' | 'none' | 'each' }): Promise<void>;
withLock<R>(lock: DatabaseLock, callback: () => Promise<R>): Promise<R>;
withTryLock<R>(lock: DatabaseLock, callback: () => Promise<R>): Promise<R>;
isBusy(lock: DatabaseLock): boolean;
wait(lock: DatabaseLock): Promise<void>;
}

View File

@@ -210,6 +210,25 @@ export class DatabaseRepository implements IDatabaseRepository {
return res as R;
}
async withTryLock<R>(lock: DatabaseLock, callback: () => Promise<R>): Promise<R> {
let res;
const queryRunner = this.dataSource.createQueryRunner();
try {
const lockAcquired = await this.tryLock(lock, queryRunner);
if (lockAcquired) {
res = await callback();
}
} finally {
try {
await this.releaseLock(lock, queryRunner);
} finally {
await queryRunner.release();
}
}
return res as R;
}
isBusy(lock: DatabaseLock): boolean {
return this.asyncLock.isBusy(DatabaseLock[lock]);
}
@@ -222,6 +241,10 @@ export class DatabaseRepository implements IDatabaseRepository {
return queryRunner.query('SELECT pg_advisory_lock($1)', [lock]);
}
private async tryLock(lock: DatabaseLock, queryRunner: QueryRunner): Promise<boolean> {
return queryRunner.query('SELECT pg_try_advisory_lock($1)', [lock]);
}
private async releaseLock(lock: DatabaseLock, queryRunner: QueryRunner): Promise<void> {
return queryRunner.query('SELECT pg_advisory_unlock($1)', [lock]);
}