feat: nightly tasks (#19879)
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import { BadRequestException, Injectable } from '@nestjs/common';
|
||||
import { ClassConstructor } from 'class-transformer';
|
||||
import { snakeCase } from 'lodash';
|
||||
import { SystemConfig } from 'src/config';
|
||||
import { OnEvent } from 'src/decorators';
|
||||
import { mapAsset } from 'src/dtos/asset-response.dto';
|
||||
import { AllJobStatusResponseDto, JobCommandDto, JobCreateDto, JobStatusDto } from 'src/dtos/job.dto';
|
||||
@@ -8,6 +9,8 @@ import {
|
||||
AssetType,
|
||||
AssetVisibility,
|
||||
BootstrapEventPriority,
|
||||
CronJob,
|
||||
DatabaseLock,
|
||||
ImmichWorker,
|
||||
JobCommand,
|
||||
JobName,
|
||||
@@ -20,6 +23,7 @@ import { ArgOf, ArgsOf } from 'src/repositories/event.repository';
|
||||
import { BaseService } from 'src/services/base.service';
|
||||
import { ConcurrentQueueName, JobItem } from 'src/types';
|
||||
import { hexOrBufferToBase64 } from 'src/utils/bytes';
|
||||
import { handlePromiseError } from 'src/utils/misc';
|
||||
|
||||
const asJobItem = (dto: JobCreateDto): JobItem => {
|
||||
switch (dto.name) {
|
||||
@@ -53,12 +57,59 @@ const asJobItem = (dto: JobCreateDto): JobItem => {
|
||||
}
|
||||
};
|
||||
|
||||
const asNightlyTasksCron = (config: SystemConfig) => {
|
||||
const [hours, minutes] = config.nightlyTasks.startTime.split(':').map(Number);
|
||||
return `${minutes} ${hours} * * *`;
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
export class JobService extends BaseService {
|
||||
private services: ClassConstructor<unknown>[] = [];
|
||||
private nightlyJobsLock = false;
|
||||
|
||||
@OnEvent({ name: 'config.init', workers: [ImmichWorker.MICROSERVICES] })
|
||||
onConfigInit({ newConfig: config }: ArgOf<'config.init'>) {
|
||||
@OnEvent({ name: 'config.init' })
|
||||
async onConfigInit({ newConfig: config }: ArgOf<'config.init'>) {
|
||||
if (this.worker === ImmichWorker.MICROSERVICES) {
|
||||
this.updateQueueConcurrency(config);
|
||||
return;
|
||||
}
|
||||
|
||||
this.nightlyJobsLock = await this.databaseRepository.tryLock(DatabaseLock.NightlyJobs);
|
||||
if (this.nightlyJobsLock) {
|
||||
const cronExpression = asNightlyTasksCron(config);
|
||||
this.logger.debug(`Scheduling nightly jobs for ${cronExpression}`);
|
||||
this.cronRepository.create({
|
||||
name: CronJob.NightlyJobs,
|
||||
expression: cronExpression,
|
||||
start: true,
|
||||
onTick: () => handlePromiseError(this.handleNightlyJobs(), this.logger),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'config.update', server: true })
|
||||
onConfigUpdate({ newConfig: config }: ArgOf<'config.update'>) {
|
||||
if (this.worker === ImmichWorker.MICROSERVICES) {
|
||||
this.updateQueueConcurrency(config);
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.nightlyJobsLock) {
|
||||
const cronExpression = asNightlyTasksCron(config);
|
||||
this.logger.debug(`Scheduling nightly jobs for ${cronExpression}`);
|
||||
this.cronRepository.update({ name: CronJob.NightlyJobs, expression: cronExpression, start: true });
|
||||
}
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'app.bootstrap', priority: BootstrapEventPriority.JobService })
|
||||
onBootstrap() {
|
||||
this.jobRepository.setup(this.services);
|
||||
if (this.worker === ImmichWorker.MICROSERVICES) {
|
||||
this.jobRepository.startWorkers();
|
||||
}
|
||||
}
|
||||
|
||||
private updateQueueConcurrency(config: SystemConfig) {
|
||||
this.logger.debug(`Updating queue concurrency settings`);
|
||||
for (const queueName of Object.values(QueueName)) {
|
||||
let concurrency = 1;
|
||||
@@ -70,19 +121,6 @@ export class JobService extends BaseService {
|
||||
}
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'config.update', server: true, workers: [ImmichWorker.MICROSERVICES] })
|
||||
onConfigUpdate({ newConfig: config }: ArgOf<'config.update'>) {
|
||||
this.onConfigInit({ newConfig: config });
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'app.bootstrap', priority: BootstrapEventPriority.JobService })
|
||||
onBootstrap() {
|
||||
this.jobRepository.setup(this.services);
|
||||
if (this.worker === ImmichWorker.MICROSERVICES) {
|
||||
this.jobRepository.startWorkers();
|
||||
}
|
||||
}
|
||||
|
||||
setServices(services: ClassConstructor<unknown>[]) {
|
||||
this.services = services;
|
||||
}
|
||||
@@ -233,18 +271,37 @@ export class JobService extends BaseService {
|
||||
}
|
||||
|
||||
async handleNightlyJobs() {
|
||||
await this.jobRepository.queueAll([
|
||||
{ name: JobName.ASSET_DELETION_CHECK },
|
||||
{ name: JobName.USER_DELETE_CHECK },
|
||||
{ name: JobName.PERSON_CLEANUP },
|
||||
{ name: JobName.MEMORIES_CLEANUP },
|
||||
{ name: JobName.MEMORIES_CREATE },
|
||||
{ name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force: false } },
|
||||
{ name: JobName.CLEAN_OLD_AUDIT_LOGS },
|
||||
{ name: JobName.USER_SYNC_USAGE },
|
||||
{ name: JobName.QUEUE_FACIAL_RECOGNITION, data: { force: false, nightly: true } },
|
||||
{ name: JobName.CLEAN_OLD_SESSION_TOKENS },
|
||||
]);
|
||||
const config = await this.getConfig({ withCache: false });
|
||||
const jobs: JobItem[] = [];
|
||||
|
||||
if (config.nightlyTasks.databaseCleanup) {
|
||||
jobs.push(
|
||||
{ name: JobName.ASSET_DELETION_CHECK },
|
||||
{ name: JobName.USER_DELETE_CHECK },
|
||||
{ name: JobName.PERSON_CLEANUP },
|
||||
{ name: JobName.MEMORIES_CLEANUP },
|
||||
{ name: JobName.CLEAN_OLD_SESSION_TOKENS },
|
||||
{ name: JobName.CLEAN_OLD_AUDIT_LOGS },
|
||||
);
|
||||
}
|
||||
|
||||
if (config.nightlyTasks.generateMemories) {
|
||||
jobs.push({ name: JobName.MEMORIES_CREATE });
|
||||
}
|
||||
|
||||
if (config.nightlyTasks.syncQuotaUsage) {
|
||||
jobs.push({ name: JobName.USER_SYNC_USAGE });
|
||||
}
|
||||
|
||||
if (config.nightlyTasks.missingThumbnails) {
|
||||
jobs.push({ name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force: false } });
|
||||
}
|
||||
|
||||
if (config.nightlyTasks.clusterNewFaces) {
|
||||
jobs.push({ name: JobName.QUEUE_FACIAL_RECOGNITION, data: { force: false, nightly: true } });
|
||||
}
|
||||
|
||||
await this.jobRepository.queueAll(jobs);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user