add library teardown

This commit is contained in:
Jonathan Jogenfors
2024-02-29 22:50:06 +01:00
parent bd4c03067b
commit 735747c939
8 changed files with 25 additions and 11 deletions
@@ -33,7 +33,7 @@ describe(`Library watcher (e2e)`, () => {
}); });
afterEach(async () => { afterEach(async () => {
await libraryService.unwatchAll(); await libraryService.teardown();
}); });
afterAll(async () => { afterAll(async () => {
@@ -1312,7 +1312,7 @@ describe(LibraryService.name, () => {
storageMock.watch.mockImplementation(makeMockWatcher({ close: mockClose })); storageMock.watch.mockImplementation(makeMockWatcher({ close: mockClose }));
await sut.init(); await sut.init();
await sut.unwatchAll(); await sut.tearDown();
expect(mockClose).toHaveBeenCalledTimes(2); expect(mockClose).toHaveBeenCalledTimes(2);
}); });
+10 -2
View File
@@ -78,8 +78,8 @@ export class LibraryService extends EventEmitter {
if (watch.enabled) { if (watch.enabled) {
// This ensures that library watching only occurs in one microservice // This ensures that library watching only occurs in one microservice
// TODO: we could make the lock be per-library instead of global // TODO: we could make the lock be per-library instead of global
this.watchLock = await this.databaseRepository.tryLock(DatabaseLock.LibraryWatch); this.watchLock = await this.databaseRepository.tryLock(DatabaseLock.LibraryWatch);
this.watchLibraries = this.watchLock; this.watchLibraries = this.watchLock;
} }
this.jobRepository.addCronJob( this.jobRepository.addCronJob(
@@ -179,7 +179,15 @@ export class LibraryService extends EventEmitter {
} }
} }
async unwatchAll() { async teardown() {
await this.unwatchAll();
if (this.watchLock) {
await this.databaseRepository.releaseLock(DatabaseLock.LibraryWatch);
}
}
private async unwatchAll() {
for (const id in this.watchers) { for (const id in this.watchers) {
await this.unwatch(id); await this.unwatch(id);
} }
@@ -48,6 +48,7 @@ export interface IDatabaseRepository {
runMigrations(options?: { transaction?: 'all' | 'none' | 'each' }): Promise<void>; runMigrations(options?: { transaction?: 'all' | 'none' | 'each' }): Promise<void>;
withLock<R>(lock: DatabaseLock, callback: () => Promise<R>): Promise<R>; withLock<R>(lock: DatabaseLock, callback: () => Promise<R>): Promise<R>;
tryLock(lock: DatabaseLock): Promise<boolean>; tryLock(lock: DatabaseLock): Promise<boolean>;
releaseLock(lock: DatabaseLock): Promise<void>;
isBusy(lock: DatabaseLock): boolean; isBusy(lock: DatabaseLock): boolean;
wait(lock: DatabaseLock): Promise<void>; wait(lock: DatabaseLock): Promise<void>;
} }
@@ -200,7 +200,7 @@ export class DatabaseRepository implements IDatabaseRepository {
res = await callback(); res = await callback();
} finally { } finally {
try { try {
await this.releaseLock(lock, queryRunner); await this._releaseLock(lock, queryRunner);
} finally { } finally {
await queryRunner.release(); await queryRunner.release();
} }
@@ -215,6 +215,11 @@ export class DatabaseRepository implements IDatabaseRepository {
return await this.acquireTryLock(lock, queryRunner); return await this.acquireTryLock(lock, queryRunner);
} }
async releaseLock(lock: DatabaseLock): Promise<void> {
const queryRunner = this.dataSource.createQueryRunner();
return await this._releaseLock(lock, queryRunner);
}
isBusy(lock: DatabaseLock): boolean { isBusy(lock: DatabaseLock): boolean {
return this.asyncLock.isBusy(DatabaseLock[lock]); return this.asyncLock.isBusy(DatabaseLock[lock]);
} }
@@ -232,7 +237,7 @@ export class DatabaseRepository implements IDatabaseRepository {
return lockResult[0].pg_try_advisory_lock; return lockResult[0].pg_try_advisory_lock;
} }
private async releaseLock(lock: DatabaseLock, queryRunner: QueryRunner): Promise<void> { private async _releaseLock(lock: DatabaseLock, queryRunner: QueryRunner): Promise<void> {
return queryRunner.query('SELECT pg_advisory_unlock($1)', [lock]); return queryRunner.query('SELECT pg_advisory_unlock($1)', [lock]);
} }
} }
+1 -2
View File
@@ -87,8 +87,7 @@ export class AppService {
} }
async teardown() { async teardown() {
await this.libraryService.unwatchAll(); await this.libraryService.teardown();
await this.metadataService.teardown(); await this.metadataService.teardown();
} }
} }
+3 -3
View File
@@ -48,6 +48,9 @@ export const db = {
if (deleteUsers) { if (deleteUsers) {
await em.query(`DELETE FROM "users" CASCADE;`); await em.query(`DELETE FROM "users" CASCADE;`);
} }
// Release all locks
await em.query('SELECT pg_advisory_unlock_all()');
}); });
}, },
disconnect: async () => { disconnect: async () => {
@@ -124,9 +127,6 @@ export const testApp = {
}, },
reset: async (options?: ResetOptions) => { reset: async (options?: ResetOptions) => {
await db.reset(options); await db.reset(options);
await app.get(AppService).init();
await app.get(MicroAppService).init();
}, },
get: (member: any) => app.get(member), get: (member: any) => app.get(member),
teardown: async () => { teardown: async () => {
@@ -14,6 +14,7 @@ export const newDatabaseRepositoryMock = (): jest.Mocked<IDatabaseRepository> =>
runMigrations: jest.fn(), runMigrations: jest.fn(),
withLock: jest.fn().mockImplementation((_, function_: <R>() => Promise<R>) => function_()), withLock: jest.fn().mockImplementation((_, function_: <R>() => Promise<R>) => function_()),
tryLock: jest.fn(), tryLock: jest.fn(),
releaseLock: jest.fn(),
isBusy: jest.fn(), isBusy: jest.fn(),
wait: jest.fn(), wait: jest.fn(),
}; };