fix: ensure lock stays on
This commit is contained in:
@@ -74,11 +74,9 @@ export class LibraryService extends EventEmitter {
|
|||||||
const { watch, scan } = config.library;
|
const { watch, scan } = config.library;
|
||||||
this.watchLibraries = false;
|
this.watchLibraries = false;
|
||||||
if (watch.enabled) {
|
if (watch.enabled) {
|
||||||
this.databaseRepository.withTryLock(DatabaseLock.LibraryWatch, async () => {
|
|
||||||
// This ensures that library watching only occurs in one microservice
|
// This ensures that library watching only occurs in one microservice
|
||||||
// TODO: we could make the lock be per-library instead of global
|
// TODO: we could make the lock be per-library instead of global
|
||||||
this.watchLibraries = true;
|
this.watchLibraries = await this.databaseRepository.tryLock(DatabaseLock.LibraryWatch);
|
||||||
});
|
|
||||||
}
|
}
|
||||||
this.jobRepository.addCronJob(
|
this.jobRepository.addCronJob(
|
||||||
'libraryScan',
|
'libraryScan',
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ export interface IDatabaseRepository {
|
|||||||
shouldReindex(name: VectorIndex): Promise<boolean>;
|
shouldReindex(name: VectorIndex): Promise<boolean>;
|
||||||
runMigrations(options?: { transaction?: 'all' | 'none' | 'each' }): Promise<void>;
|
runMigrations(options?: { transaction?: 'all' | 'none' | 'each' }): Promise<void>;
|
||||||
withLock<R>(lock: DatabaseLock, callback: () => Promise<R>): Promise<R>;
|
withLock<R>(lock: DatabaseLock, callback: () => Promise<R>): Promise<R>;
|
||||||
withTryLock<R>(lock: DatabaseLock, callback: () => Promise<R>): Promise<R>;
|
tryLock(lock: DatabaseLock): Promise<boolean>;
|
||||||
isBusy(lock: DatabaseLock): boolean;
|
isBusy(lock: DatabaseLock): boolean;
|
||||||
wait(lock: DatabaseLock): Promise<void>;
|
wait(lock: DatabaseLock): Promise<void>;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -210,23 +210,9 @@ export class DatabaseRepository implements IDatabaseRepository {
|
|||||||
return res as R;
|
return res as R;
|
||||||
}
|
}
|
||||||
|
|
||||||
async withTryLock<R>(lock: DatabaseLock, callback: () => Promise<R>): Promise<R> {
|
async tryLock(lock: DatabaseLock): Promise<boolean> {
|
||||||
let res;
|
|
||||||
const queryRunner = this.dataSource.createQueryRunner();
|
const queryRunner = this.dataSource.createQueryRunner();
|
||||||
try {
|
return await this.acquireTryLock(lock, queryRunner);
|
||||||
const lockAcquired = await this.tryLock(lock, queryRunner);
|
|
||||||
if (lockAcquired) {
|
|
||||||
res = await callback();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
await this.releaseLock(lock, queryRunner);
|
|
||||||
} finally {
|
|
||||||
await queryRunner.release();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return res as R;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
isBusy(lock: DatabaseLock): boolean {
|
isBusy(lock: DatabaseLock): boolean {
|
||||||
@@ -241,7 +227,7 @@ export class DatabaseRepository implements IDatabaseRepository {
|
|||||||
return queryRunner.query('SELECT pg_advisory_lock($1)', [lock]);
|
return queryRunner.query('SELECT pg_advisory_lock($1)', [lock]);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async tryLock(lock: DatabaseLock, queryRunner: QueryRunner): Promise<boolean> {
|
private async acquireTryLock(lock: DatabaseLock, queryRunner: QueryRunner): Promise<boolean> {
|
||||||
return queryRunner.query('SELECT pg_try_advisory_lock($1)', [lock]);
|
return queryRunner.query('SELECT pg_try_advisory_lock($1)', [lock]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ export const newDatabaseRepositoryMock = (): jest.Mocked<IDatabaseRepository> =>
|
|||||||
shouldReindex: jest.fn(),
|
shouldReindex: jest.fn(),
|
||||||
runMigrations: jest.fn(),
|
runMigrations: jest.fn(),
|
||||||
withLock: jest.fn().mockImplementation((_, function_: <R>() => Promise<R>) => function_()),
|
withLock: jest.fn().mockImplementation((_, function_: <R>() => Promise<R>) => function_()),
|
||||||
|
tryLock: jest.fn(),
|
||||||
isBusy: jest.fn(),
|
isBusy: jest.fn(),
|
||||||
wait: jest.fn(),
|
wait: jest.fn(),
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user