da7a81b752
We would like to move away from the concept of finding and removing pending jobs. The only place this is used is for album update notifications, and this is done so that users who initially uploaded assets to an album will also receive a notification if someone else then adds assets to the same album. This can also be achieved with a job for each recipient. Multiple jobs also has the advantage that it will scale better for albums with many users, it's possible to send notifications concurrently, retries are possible without sending duplicate notifications, and it's clear what recipient a job failed for.
242 lines
7.4 KiB
TypeScript
242 lines
7.4 KiB
TypeScript
import { getQueueToken } from '@nestjs/bullmq';
|
|
import { Injectable } from '@nestjs/common';
|
|
import { ModuleRef, Reflector } from '@nestjs/core';
|
|
import { JobsOptions, Queue, Worker } from 'bullmq';
|
|
import { ClassConstructor } from 'class-transformer';
|
|
import { setTimeout } from 'node:timers/promises';
|
|
import { JobConfig } from 'src/decorators';
|
|
import { JobName, JobStatus, MetadataKey, QueueCleanType, QueueName } from 'src/enum';
|
|
import { ConfigRepository } from 'src/repositories/config.repository';
|
|
import { EventRepository } from 'src/repositories/event.repository';
|
|
import { LoggingRepository } from 'src/repositories/logging.repository';
|
|
import { JobCounts, JobItem, JobOf, QueueStatus } from 'src/types';
|
|
import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc';
|
|
|
|
type JobMapItem = {
|
|
jobName: JobName;
|
|
queueName: QueueName;
|
|
handler: (job: JobOf<any>) => Promise<JobStatus>;
|
|
label: string;
|
|
};
|
|
|
|
@Injectable()
|
|
export class JobRepository {
|
|
private workers: Partial<Record<QueueName, Worker>> = {};
|
|
private handlers: Partial<Record<JobName, JobMapItem>> = {};
|
|
|
|
constructor(
|
|
private moduleRef: ModuleRef,
|
|
private configRepository: ConfigRepository,
|
|
private eventRepository: EventRepository,
|
|
private logger: LoggingRepository,
|
|
) {
|
|
this.logger.setContext(JobRepository.name);
|
|
}
|
|
|
|
setup(services: ClassConstructor<unknown>[]) {
|
|
const reflector = this.moduleRef.get(Reflector, { strict: false });
|
|
|
|
// discovery
|
|
for (const Service of services) {
|
|
const instance = this.moduleRef.get<any>(Service);
|
|
for (const methodName of getMethodNames(instance)) {
|
|
const handler = instance[methodName];
|
|
const config = reflector.get<JobConfig>(MetadataKey.JOB_CONFIG, handler);
|
|
if (!config) {
|
|
continue;
|
|
}
|
|
|
|
const { name: jobName, queue: queueName } = config;
|
|
const label = `${Service.name}.${handler.name}`;
|
|
|
|
// one handler per job
|
|
if (this.handlers[jobName]) {
|
|
const jobKey = getKeyByValue(JobName, jobName);
|
|
const errorMessage = `Failed to add job handler for ${label}`;
|
|
this.logger.error(
|
|
`${errorMessage}. JobName.${jobKey} is already handled by ${this.handlers[jobName].label}.`,
|
|
);
|
|
throw new ImmichStartupError(errorMessage);
|
|
}
|
|
|
|
this.handlers[jobName] = {
|
|
label,
|
|
jobName,
|
|
queueName,
|
|
handler: handler.bind(instance),
|
|
};
|
|
|
|
this.logger.verbose(`Added job handler: ${jobName} => ${label}`);
|
|
}
|
|
}
|
|
|
|
// no missing handlers
|
|
for (const [jobKey, jobName] of Object.entries(JobName)) {
|
|
const item = this.handlers[jobName];
|
|
if (!item) {
|
|
const errorMessage = `Failed to find job handler for Job.${jobKey} ("${jobName}")`;
|
|
this.logger.error(
|
|
`${errorMessage}. Make sure to add the @OnJob({ name: JobName.${jobKey}, queue: QueueName.XYZ }) decorator for the new job.`,
|
|
);
|
|
throw new ImmichStartupError(errorMessage);
|
|
}
|
|
}
|
|
}
|
|
|
|
startWorkers() {
|
|
const { bull } = this.configRepository.getEnv();
|
|
for (const queueName of Object.values(QueueName)) {
|
|
this.logger.debug(`Starting worker for queue: ${queueName}`);
|
|
this.workers[queueName] = new Worker(
|
|
queueName,
|
|
(job) => this.eventRepository.emit('job.start', queueName, job as JobItem),
|
|
{ ...bull.config, concurrency: 1 },
|
|
);
|
|
}
|
|
}
|
|
|
|
async run({ name, data }: JobItem) {
|
|
const item = this.handlers[name as JobName];
|
|
if (!item) {
|
|
this.logger.warn(`Skipping unknown job: "${name}"`);
|
|
return JobStatus.SKIPPED;
|
|
}
|
|
|
|
return item.handler(data);
|
|
}
|
|
|
|
setConcurrency(queueName: QueueName, concurrency: number) {
|
|
const worker = this.workers[queueName];
|
|
if (!worker) {
|
|
this.logger.warn(`Unable to set queue concurrency, worker not found: '${queueName}'`);
|
|
return;
|
|
}
|
|
|
|
worker.concurrency = concurrency;
|
|
}
|
|
|
|
async getQueueStatus(name: QueueName): Promise<QueueStatus> {
|
|
const queue = this.getQueue(name);
|
|
|
|
return {
|
|
isActive: !!(await queue.getActiveCount()),
|
|
isPaused: await queue.isPaused(),
|
|
};
|
|
}
|
|
|
|
pause(name: QueueName) {
|
|
return this.getQueue(name).pause();
|
|
}
|
|
|
|
resume(name: QueueName) {
|
|
return this.getQueue(name).resume();
|
|
}
|
|
|
|
empty(name: QueueName) {
|
|
return this.getQueue(name).drain();
|
|
}
|
|
|
|
clear(name: QueueName, type: QueueCleanType) {
|
|
return this.getQueue(name).clean(0, 1000, type);
|
|
}
|
|
|
|
getJobCounts(name: QueueName): Promise<JobCounts> {
|
|
return this.getQueue(name).getJobCounts(
|
|
'active',
|
|
'completed',
|
|
'failed',
|
|
'delayed',
|
|
'waiting',
|
|
'paused',
|
|
) as unknown as Promise<JobCounts>;
|
|
}
|
|
|
|
private getQueueName(name: JobName) {
|
|
return (this.handlers[name] as JobMapItem).queueName;
|
|
}
|
|
|
|
async queueAll(items: JobItem[]): Promise<void> {
|
|
if (items.length === 0) {
|
|
return;
|
|
}
|
|
|
|
const promises = [];
|
|
const itemsByQueue = {} as Record<string, (JobItem & { data: any; options: JobsOptions | undefined })[]>;
|
|
for (const item of items) {
|
|
const queueName = this.getQueueName(item.name);
|
|
const job = {
|
|
name: item.name,
|
|
data: item.data || {},
|
|
options: this.getJobOptions(item) || undefined,
|
|
} as JobItem & { data: any; options: JobsOptions | undefined };
|
|
|
|
if (job.options?.jobId) {
|
|
// need to use add() instead of addBulk() for jobId deduplication
|
|
promises.push(this.getQueue(queueName).add(item.name, item.data, job.options));
|
|
} else {
|
|
itemsByQueue[queueName] = itemsByQueue[queueName] || [];
|
|
itemsByQueue[queueName].push(job);
|
|
}
|
|
}
|
|
|
|
for (const [queueName, jobs] of Object.entries(itemsByQueue)) {
|
|
const queue = this.getQueue(queueName as QueueName);
|
|
promises.push(queue.addBulk(jobs));
|
|
}
|
|
|
|
await Promise.all(promises);
|
|
}
|
|
|
|
async queue(item: JobItem): Promise<void> {
|
|
return this.queueAll([item]);
|
|
}
|
|
|
|
async waitForQueueCompletion(...queues: QueueName[]): Promise<void> {
|
|
let activeQueue: QueueStatus | undefined;
|
|
do {
|
|
const statuses = await Promise.all(queues.map((name) => this.getQueueStatus(name)));
|
|
activeQueue = statuses.find((status) => status.isActive);
|
|
} while (activeQueue);
|
|
{
|
|
this.logger.verbose(`Waiting for ${activeQueue} queue to stop...`);
|
|
await setTimeout(1000);
|
|
}
|
|
}
|
|
|
|
private getJobOptions(item: JobItem): JobsOptions | null {
|
|
switch (item.name) {
|
|
case JobName.NOTIFY_ALBUM_UPDATE: {
|
|
return {
|
|
jobId: `${item.data.id}/${item.data.recipientId}`,
|
|
delay: item.data?.delay,
|
|
};
|
|
}
|
|
case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: {
|
|
return { jobId: item.data.id };
|
|
}
|
|
case JobName.GENERATE_PERSON_THUMBNAIL: {
|
|
return { priority: 1 };
|
|
}
|
|
case JobName.QUEUE_FACIAL_RECOGNITION: {
|
|
return { jobId: JobName.QUEUE_FACIAL_RECOGNITION };
|
|
}
|
|
default: {
|
|
return null;
|
|
}
|
|
}
|
|
}
|
|
|
|
private getQueue(queue: QueueName): Queue {
|
|
return this.moduleRef.get<Queue>(getQueueToken(queue), { strict: false });
|
|
}
|
|
|
|
/** @deprecated */
|
|
// todo: remove this when asset notifications no longer need it.
|
|
public async removeJob(name: JobName, jobID: string): Promise<void> {
|
|
const existingJob = await this.getQueue(this.getQueueName(name)).getJob(jobID);
|
|
if (existingJob) {
|
|
await existingJob.remove();
|
|
}
|
|
}
|
|
}
|