feat: audit cleanup (#21567)
This commit is contained in:
@@ -42,6 +42,7 @@ describe(JobService.name, () => {
|
||||
{ name: JobName.PersonCleanup },
|
||||
{ name: JobName.MemoryCleanup },
|
||||
{ name: JobName.SessionCleanup },
|
||||
{ name: JobName.AuditTableCleanup },
|
||||
{ name: JobName.AuditLogCleanup },
|
||||
{ name: JobName.MemoryGenerate },
|
||||
{ name: JobName.UserSyncUsage },
|
||||
|
||||
@@ -281,6 +281,7 @@ export class JobService extends BaseService {
|
||||
{ name: JobName.PersonCleanup },
|
||||
{ name: JobName.MemoryCleanup },
|
||||
{ name: JobName.SessionCleanup },
|
||||
{ name: JobName.AuditTableCleanup },
|
||||
{ name: JobName.AuditLogCleanup },
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import { BadRequestException, ForbiddenException, Injectable } from '@nestjs/common';
|
||||
import { Insertable } from 'kysely';
|
||||
import { DateTime } from 'luxon';
|
||||
import { DateTime, Duration } from 'luxon';
|
||||
import { Writable } from 'node:stream';
|
||||
import { AUDIT_LOG_MAX_DURATION } from 'src/constants';
|
||||
import { OnJob } from 'src/decorators';
|
||||
import { AssetResponseDto, mapAsset } from 'src/dtos/asset-response.dto';
|
||||
import { AuthDto } from 'src/dtos/auth.dto';
|
||||
import {
|
||||
@@ -15,7 +16,16 @@ import {
|
||||
SyncItem,
|
||||
SyncStreamDto,
|
||||
} from 'src/dtos/sync.dto';
|
||||
import { AssetVisibility, DatabaseAction, EntityType, Permission, SyncEntityType, SyncRequestType } from 'src/enum';
|
||||
import {
|
||||
AssetVisibility,
|
||||
DatabaseAction,
|
||||
EntityType,
|
||||
JobName,
|
||||
Permission,
|
||||
QueueName,
|
||||
SyncEntityType,
|
||||
SyncRequestType,
|
||||
} from 'src/enum';
|
||||
import { SyncQueryOptions } from 'src/repositories/sync.repository';
|
||||
import { SessionSyncCheckpointTable } from 'src/schema/tables/sync-checkpoint.table';
|
||||
import { BaseService } from 'src/services/base.service';
|
||||
@@ -32,6 +42,8 @@ type AssetLike = Omit<SyncAssetV1, 'checksum' | 'thumbhash'> & {
|
||||
};
|
||||
|
||||
const COMPLETE_ID = 'complete';
|
||||
const MAX_DAYS = 30;
|
||||
const MAX_DURATION = Duration.fromObject({ days: MAX_DAYS });
|
||||
|
||||
const mapSyncAssetV1 = ({ checksum, thumbhash, ...data }: AssetLike): SyncAssetV1 => ({
|
||||
...data,
|
||||
@@ -137,19 +149,24 @@ export class SyncService extends BaseService {
|
||||
}
|
||||
|
||||
const isPendingSyncReset = await this.sessionRepository.isPendingSyncReset(session.id);
|
||||
|
||||
if (isPendingSyncReset) {
|
||||
send(response, { type: SyncEntityType.SyncResetV1, ids: ['reset'], data: {} });
|
||||
response.end();
|
||||
return;
|
||||
}
|
||||
|
||||
const checkpoints = await this.syncCheckpointRepository.getAll(session.id);
|
||||
const checkpointMap: CheckpointMap = Object.fromEntries(checkpoints.map(({ type, ack }) => [type, fromAck(ack)]));
|
||||
|
||||
if (this.needsFullSync(checkpointMap)) {
|
||||
send(response, { type: SyncEntityType.SyncResetV1, ids: ['reset'], data: {} });
|
||||
response.end();
|
||||
return;
|
||||
}
|
||||
|
||||
const { nowId } = await this.syncCheckpointRepository.getNow();
|
||||
const options: SyncQueryOptions = { nowId, userId: auth.user.id };
|
||||
|
||||
const checkpoints = await this.syncCheckpointRepository.getAll(session.id);
|
||||
const checkpointMap: CheckpointMap = Object.fromEntries(checkpoints.map(({ type, ack }) => [type, fromAck(ack)]));
|
||||
|
||||
const handlers: Record<SyncRequestType, () => Promise<void>> = {
|
||||
[SyncRequestType.AuthUsersV1]: () => this.syncAuthUsersV1(options, response, checkpointMap),
|
||||
[SyncRequestType.UsersV1]: () => this.syncUsersV1(options, response, checkpointMap),
|
||||
@@ -180,9 +197,41 @@ export class SyncService extends BaseService {
|
||||
await handler();
|
||||
}
|
||||
|
||||
send(response, { type: SyncEntityType.SyncCompleteV1, ids: [nowId], data: {} });
|
||||
|
||||
response.end();
|
||||
}
|
||||
|
||||
@OnJob({ name: JobName.AuditTableCleanup, queue: QueueName.BackgroundTask })
|
||||
async onAuditTableCleanup() {
|
||||
const pruneThreshold = MAX_DAYS + 1;
|
||||
|
||||
await this.syncRepository.album.cleanupAuditTable(pruneThreshold);
|
||||
await this.syncRepository.albumUser.cleanupAuditTable(pruneThreshold);
|
||||
await this.syncRepository.albumToAsset.cleanupAuditTable(pruneThreshold);
|
||||
await this.syncRepository.asset.cleanupAuditTable(pruneThreshold);
|
||||
await this.syncRepository.assetFace.cleanupAuditTable(pruneThreshold);
|
||||
await this.syncRepository.assetMetadata.cleanupAuditTable(pruneThreshold);
|
||||
await this.syncRepository.memory.cleanupAuditTable(pruneThreshold);
|
||||
await this.syncRepository.memoryToAsset.cleanupAuditTable(pruneThreshold);
|
||||
await this.syncRepository.partner.cleanupAuditTable(pruneThreshold);
|
||||
await this.syncRepository.person.cleanupAuditTable(pruneThreshold);
|
||||
await this.syncRepository.stack.cleanupAuditTable(pruneThreshold);
|
||||
await this.syncRepository.user.cleanupAuditTable(pruneThreshold);
|
||||
await this.syncRepository.userMetadata.cleanupAuditTable(pruneThreshold);
|
||||
}
|
||||
|
||||
private needsFullSync(checkpointMap: CheckpointMap) {
|
||||
const completeAck = checkpointMap[SyncEntityType.SyncCompleteV1];
|
||||
if (!completeAck) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const milliseconds = Number.parseInt(completeAck.updateId.replaceAll('-', '').slice(0, 12), 16);
|
||||
|
||||
return DateTime.fromMillis(milliseconds) < DateTime.now().minus(MAX_DURATION);
|
||||
}
|
||||
|
||||
private async syncAuthUsersV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
|
||||
const upsertType = SyncEntityType.AuthUserV1;
|
||||
const upserts = this.syncRepository.authUser.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
@@ -719,13 +768,13 @@ export class SyncService extends BaseService {
|
||||
|
||||
private async syncPeopleV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
|
||||
const deleteType = SyncEntityType.PersonDeleteV1;
|
||||
const deletes = this.syncRepository.people.getDeletes({ ...options, ack: checkpointMap[deleteType] });
|
||||
const deletes = this.syncRepository.person.getDeletes({ ...options, ack: checkpointMap[deleteType] });
|
||||
for await (const { id, ...data } of deletes) {
|
||||
send(response, { type: deleteType, ids: [id], data });
|
||||
}
|
||||
|
||||
const upsertType = SyncEntityType.PersonV1;
|
||||
const upserts = this.syncRepository.people.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
const upserts = this.syncRepository.person.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data });
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user