Compare commits

..

1 Commits

Author SHA1 Message Date
Thomas Way
d46e5f2436 feat: Use postgres as a queue
We've been keen to try this for a while as it means we can remove redis as a
dependency, which makes Immich easier to setup and run.

This replaces bullmq with a bespoke postgres queue. Jobs in the queue are
processed either immediately via triggers and notifications, or eventually if a
notification is missed.
2025-04-30 22:42:18 +01:00
13 changed files with 517 additions and 89 deletions

View File

@@ -33,6 +33,7 @@ services:
- ${UPLOAD_LOCATION}/photos/upload:/usr/src/app/upload/upload - ${UPLOAD_LOCATION}/photos/upload:/usr/src/app/upload/upload
- /usr/src/app/node_modules - /usr/src/app/node_modules
- /etc/localtime:/etc/localtime:ro - /etc/localtime:/etc/localtime:ro
- ../flickr30k-images:/flickr30k:ro
env_file: env_file:
- .env - .env
environment: environment:
@@ -147,25 +148,25 @@ services:
-c wal_compression=on -c wal_compression=on
# set IMMICH_TELEMETRY_INCLUDE=all in .env to enable metrics # set IMMICH_TELEMETRY_INCLUDE=all in .env to enable metrics
# immich-prometheus: immich-prometheus:
# container_name: immich_prometheus container_name: immich_prometheus
# ports: ports:
# - 9090:9090 - 9090:9090
# image: prom/prometheus image: prom/prometheus
# volumes: volumes:
# - ./prometheus.yml:/etc/prometheus/prometheus.yml - ./prometheus.yml:/etc/prometheus/prometheus.yml
# - prometheus-data:/prometheus - prometheus-data:/prometheus
# first login uses admin/admin # first login uses admin/admin
# add data source for http://immich-prometheus:9090 to get started # add data source for http://immich-prometheus:9090 to get started
# immich-grafana: immich-grafana:
# container_name: immich_grafana container_name: immich_grafana
# command: ['./run.sh', '-disable-reporting'] command: ['./run.sh', '-disable-reporting']
# ports: ports:
# - 3000:3000 - 3001:3000
# image: grafana/grafana:10.3.3-ubuntu image: grafana/grafana:10.3.3-ubuntu
# volumes: volumes:
# - grafana-data:/var/lib/grafana - grafana-data:/var/lib/grafana
volumes: volumes:
model-cache: model-cache:

View File

@@ -1,12 +1,14 @@
global: global:
scrape_interval: 15s scrape_interval: 3s
evaluation_interval: 15s evaluation_interval: 3s
scrape_configs: scrape_configs:
- job_name: immich_api - job_name: immich_api
scrape_interval: 3s
static_configs: static_configs:
- targets: ['immich-server:8081'] - targets: ["immich-server:8081"]
- job_name: immich_microservices - job_name: immich_microservices
scrape_interval:
static_configs: static_configs:
- targets: ['immich-server:8082'] - targets: ["immich-server:8082"]

View File

@@ -38,7 +38,7 @@
"fast-glob": "^3.3.2", "fast-glob": "^3.3.2",
"fluent-ffmpeg": "^2.1.2", "fluent-ffmpeg": "^2.1.2",
"geo-tz": "^8.0.0", "geo-tz": "^8.0.0",
"graphile-worker": "^0.17.0-canary.1fcb2a0", "graphile-worker": "^0.16.6",
"handlebars": "^4.7.8", "handlebars": "^4.7.8",
"i18n-iso-countries": "^7.6.0", "i18n-iso-countries": "^7.6.0",
"joi": "^17.10.0", "joi": "^17.10.0",
@@ -10085,16 +10085,16 @@
"license": "MIT" "license": "MIT"
}, },
"node_modules/graphile-worker": { "node_modules/graphile-worker": {
"version": "0.17.0-canary.1fcb2a0", "version": "0.16.6",
"resolved": "https://registry.npmjs.org/graphile-worker/-/graphile-worker-0.17.0-canary.1fcb2a0.tgz", "resolved": "https://registry.npmjs.org/graphile-worker/-/graphile-worker-0.16.6.tgz",
"integrity": "sha512-eG02GZ0U1eSMBdfHlQg9+jaNXpr9gs1cwqfFeney3BHpEMSvG3jw+7SdQJPVUgF8wnt8dRRfhkbpzaXGSOr+MQ==", "integrity": "sha512-e7gGYDmGqzju2l83MpzX8vNG/lOtVJiSzI3eZpAFubSxh/cxs7sRrRGBGjzBP1kNG0H+c95etPpNRNlH65PYhw==",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"@graphile/logger": "^0.2.0", "@graphile/logger": "^0.2.0",
"@types/debug": "^4.1.10", "@types/debug": "^4.1.10",
"@types/pg": "^8.10.5", "@types/pg": "^8.10.5",
"cosmiconfig": "^8.3.6", "cosmiconfig": "^8.3.6",
"graphile-config": "^0.0.1-beta.14", "graphile-config": "^0.0.1-beta.4",
"json5": "^2.2.3", "json5": "^2.2.3",
"pg": "^8.11.3", "pg": "^8.11.3",
"tslib": "^2.6.2", "tslib": "^2.6.2",
@@ -10104,8 +10104,7 @@
"graphile-worker": "dist/cli.js" "graphile-worker": "dist/cli.js"
}, },
"engines": { "engines": {
"node": ">=14.0.0", "node": ">=14.0.0"
"yarn": "^1.22.22"
} }
}, },
"node_modules/handlebars": { "node_modules/handlebars": {

View File

@@ -63,7 +63,7 @@
"fast-glob": "^3.3.2", "fast-glob": "^3.3.2",
"fluent-ffmpeg": "^2.1.2", "fluent-ffmpeg": "^2.1.2",
"geo-tz": "^8.0.0", "geo-tz": "^8.0.0",
"graphile-worker": "^0.17.0-canary.1fcb2a0", "graphile-worker": "^0.16.6",
"handlebars": "^4.7.8", "handlebars": "^4.7.8",
"i18n-iso-countries": "^7.6.0", "i18n-iso-countries": "^7.6.0",
"joi": "^17.10.0", "joi": "^17.10.0",

View File

@@ -255,10 +255,10 @@ export class DatabaseRepository {
} }
} }
// if (error) { if (error) {
// this.logger.error(`Kysely migrations failed: ${error}`); this.logger.error(`Kysely migrations failed: ${error}`);
// throw error; throw error;
// } }
this.logger.debug('Finished running kysely migrations'); this.logger.debug('Finished running kysely migrations');
} }

View File

@@ -1,8 +1,9 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { ModuleRef, Reflector } from '@nestjs/core'; import { ModuleRef, Reflector } from '@nestjs/core';
import { ClassConstructor } from 'class-transformer'; import { ClassConstructor } from 'class-transformer';
import { AddJobsJobSpec, makeWorkerUtils, run, Runner, WorkerUtils } from 'graphile-worker'; import { makeWorkerUtils, run, Runner, TaskSpec, WorkerUtils } from 'graphile-worker';
import { Kysely } from 'kysely'; import { Kysely } from 'kysely';
import { DateTime, Duration } from 'luxon';
import { InjectKysely } from 'nestjs-kysely'; import { InjectKysely } from 'nestjs-kysely';
import pg, { PoolConfig } from 'pg'; import pg, { PoolConfig } from 'pg';
import { DB } from 'src/db'; import { DB } from 'src/db';
@@ -100,6 +101,8 @@ export class JobRepository {
waiting: ${pool.waitingCount}`); waiting: ${pool.waitingCount}`);
}, 5000); }, 5000);
pool.setMaxListeners(100);
pool.on('connect', (client) => { pool.on('connect', (client) => {
client.setMaxListeners(200); client.setMaxListeners(200);
}); });
@@ -128,7 +131,7 @@ export class JobRepository {
concurrency, concurrency,
taskList: { taskList: {
[queueName]: async (payload: unknown): Promise<void> => { [queueName]: async (payload: unknown): Promise<void> => {
// this.logger.log(`Job ${queueName} started with payload: ${JSON.stringify(payload)}`); this.logger.log(`Job ${queueName} started with payload: ${JSON.stringify(payload)}`);
await this.eventRepository.emit('job.start', queueName, payload as JobItem); await this.eventRepository.emit('job.start', queueName, payload as JobItem);
}, },
}, },
@@ -177,21 +180,21 @@ export class JobRepository {
return (this.handlers[name] as JobMapItem).queueName; return (this.handlers[name] as JobMapItem).queueName;
} }
run({ name, data }: JobItem): Promise<JobStatus> { async run({ name, data }: JobItem): Promise<JobStatus> {
const item = this.handlers[name as JobName]; const item = this.handlers[name as JobName];
if (!item) { if (!item) {
this.logger.warn(`Skipping unknown job: "${name}"`); this.logger.warn(`Skipping unknown job: "${name}"`);
return Promise.resolve(JobStatus.SKIPPED); return JobStatus.SKIPPED;
} }
return item.handler(data); return item.handler(data);
} }
queue(item: JobItem): Promise<unknown> { async queue(item: JobItem): Promise<void> {
return this.queueAll([item]); await this.workerUtils!.addJob(this.getQueueName(item.name), item, this.getJobOptions(item));
} }
queueAll(items: JobItem[]): Promise<unknown> { async queueAll(items: JobItem[]): Promise<void> {
return this.workerUtils!.addJobs(items.map((item) => this.getJobSpec(item))); await Promise.all(items.map((item) => this.queue(item)));
} }
// todo: are we actually generating sql // todo: are we actually generating sql
@@ -276,28 +279,23 @@ export class JobRepository {
return { paused: state?.[queueName]?.paused ?? false }; return { paused: state?.[queueName]?.paused ?? false };
} }
private getJobSpec(item: JobItem): AddJobsJobSpec { private getJobOptions(item: JobItem): TaskSpec | undefined {
const identifier = (this.handlers[item.name] as JobMapItem).queueName;
switch (item.name) { switch (item.name) {
case JobName.NOTIFY_ALBUM_UPDATE: { case JobName.NOTIFY_ALBUM_UPDATE: {
return { let runAt: Date | undefined;
identifier, if (item.data?.delay) {
payload: item, runAt = DateTime.now().plus(Duration.fromMillis(item.data.delay)).toJSDate();
jobKey: item.data.id, }
runAt: item.data?.delay ? new Date(Date.now() + item.data.delay) : undefined, return { jobKey: item.data.id, runAt };
};
} }
case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: { case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: {
return { identifier, payload: item, jobKey: QueueName.STORAGE_TEMPLATE_MIGRATION }; return { jobKey: QueueName.STORAGE_TEMPLATE_MIGRATION };
} }
case JobName.GENERATE_PERSON_THUMBNAIL: { case JobName.GENERATE_PERSON_THUMBNAIL: {
return { identifier, payload: item, priority: 1 }; return { priority: 1 };
} }
case JobName.QUEUE_FACIAL_RECOGNITION: { case JobName.QUEUE_FACIAL_RECOGNITION: {
return { identifier, payload: item, jobKey: JobName.QUEUE_FACIAL_RECOGNITION }; return { jobKey: JobName.QUEUE_FACIAL_RECOGNITION };
}
default: {
return { identifier, payload: item };
} }
} }
} }

View File

@@ -4,10 +4,11 @@ import { OnJob } from 'src/decorators';
import { mapAsset } from 'src/dtos/asset-response.dto'; import { mapAsset } from 'src/dtos/asset-response.dto';
import { AuthDto } from 'src/dtos/auth.dto'; import { AuthDto } from 'src/dtos/auth.dto';
import { DuplicateResponseDto } from 'src/dtos/duplicate.dto'; import { DuplicateResponseDto } from 'src/dtos/duplicate.dto';
import { JobName, JobStatus, QueueName } from 'src/enum'; import { AssetFileType, JobName, JobStatus, QueueName } from 'src/enum';
import { AssetDuplicateResult } from 'src/repositories/search.repository'; import { AssetDuplicateResult } from 'src/repositories/search.repository';
import { BaseService } from 'src/services/base.service'; import { BaseService } from 'src/services/base.service';
import { JobItem, JobOf } from 'src/types'; import { JobItem, JobOf } from 'src/types';
import { getAssetFile } from 'src/utils/asset.util';
import { isDuplicateDetectionEnabled } from 'src/utils/misc'; import { isDuplicateDetectionEnabled } from 'src/utils/misc';
@Injectable() @Injectable()
@@ -59,6 +60,49 @@ export class DuplicateService extends BaseService {
return JobStatus.FAILED; return JobStatus.FAILED;
} }
if (asset.stackId) {
this.logger.debug(`Asset ${id} is part of a stack, skipping`);
return JobStatus.SKIPPED;
}
if (!asset.isVisible) {
this.logger.debug(`Asset ${id} is not visible, skipping`);
return JobStatus.SKIPPED;
}
const previewFile = getAssetFile(asset.files || [], AssetFileType.PREVIEW);
if (!previewFile) {
this.logger.warn(`Asset ${id} is missing preview image`);
return JobStatus.FAILED;
}
if (!asset.embedding) {
this.logger.debug(`Asset ${id} is missing embedding`);
return JobStatus.FAILED;
}
const duplicateAssets = await this.searchRepository.searchDuplicates({
assetId: asset.id,
embedding: asset.embedding,
maxDistance: machineLearning.duplicateDetection.maxDistance,
type: asset.type,
userIds: [asset.ownerId],
});
let assetIds = [asset.id];
if (duplicateAssets.length > 0) {
this.logger.debug(
`Found ${duplicateAssets.length} duplicate${duplicateAssets.length === 1 ? '' : 's'} for asset ${asset.id}`,
);
assetIds = await this.updateDuplicates(asset, duplicateAssets);
} else if (asset.duplicateId) {
this.logger.debug(`No duplicates found for asset ${asset.id}, removing duplicateId`);
await this.assetRepository.update({ id: asset.id, duplicateId: null });
}
const duplicatesDetectedAt = new Date();
await this.assetRepository.upsertJobStatus(...assetIds.map((assetId) => ({ assetId, duplicatesDetectedAt })));
return JobStatus.SUCCESS; return JobStatus.SUCCESS;
} }

View File

@@ -5,6 +5,7 @@ import { OnEvent } from 'src/decorators';
import { mapAsset } from 'src/dtos/asset-response.dto'; import { mapAsset } from 'src/dtos/asset-response.dto';
import { AllJobStatusResponseDto, JobCommandDto, JobCreateDto, JobStatusDto } from 'src/dtos/job.dto'; import { AllJobStatusResponseDto, JobCommandDto, JobCreateDto, JobStatusDto } from 'src/dtos/job.dto';
import { import {
AssetType,
BootstrapEventPriority, BootstrapEventPriority,
ImmichWorker, ImmichWorker,
JobCommand, JobCommand,
@@ -130,7 +131,7 @@ export class JobService extends BaseService {
return response; return response;
} }
private async start(name: QueueName, { force }: JobCommandDto): Promise<unknown> { private async start(name: QueueName, { force }: JobCommandDto): Promise<void> {
const { active } = await this.jobRepository.getJobCounts(name); const { active } = await this.jobRepository.getJobCounts(name);
if (active > 0) { if (active > 0) {
throw new BadRequestException(`Jobs are already running`); throw new BadRequestException(`Jobs are already running`);
@@ -305,9 +306,12 @@ export class JobService extends BaseService {
const jobs: JobItem[] = [ const jobs: JobItem[] = [
{ name: JobName.SMART_SEARCH, data: item.data }, { name: JobName.SMART_SEARCH, data: item.data },
{ name: JobName.FACE_DETECTION, data: item.data }, { name: JobName.FACE_DETECTION, data: item.data },
{ name: JobName.VIDEO_CONVERSION, data: item.data },
]; ];
if (asset.type === AssetType.VIDEO) {
jobs.push({ name: JobName.VIDEO_CONVERSION, data: item.data });
}
await this.jobRepository.queueAll(jobs); await this.jobRepository.queueAll(jobs);
if (asset.isVisible) { if (asset.isVisible) {
this.eventRepository.clientSend('on_upload_success', asset.ownerId, mapAsset(asset)); this.eventRepository.clientSend('on_upload_success', asset.ownerId, mapAsset(asset));

View File

@@ -5,19 +5,24 @@ import { Exif } from 'src/database';
import { OnEvent, OnJob } from 'src/decorators'; import { OnEvent, OnJob } from 'src/decorators';
import { SystemConfigFFmpegDto } from 'src/dtos/system-config.dto'; import { SystemConfigFFmpegDto } from 'src/dtos/system-config.dto';
import { import {
AssetFileType,
AssetPathType, AssetPathType,
AssetType,
AudioCodec, AudioCodec,
Colorspace, Colorspace,
JobName, JobName,
JobStatus, JobStatus,
LogLevel,
QueueName, QueueName,
RawExtractedFormat, RawExtractedFormat,
StorageFolder, StorageFolder,
TranscodeHWAccel,
TranscodePolicy, TranscodePolicy,
TranscodeTarget, TranscodeTarget,
VideoCodec, VideoCodec,
VideoContainer VideoContainer,
} from 'src/enum'; } from 'src/enum';
import { UpsertFileOptions } from 'src/repositories/asset.repository';
import { BaseService } from 'src/services/base.service'; import { BaseService } from 'src/services/base.service';
import { import {
AudioStreamInfo, AudioStreamInfo,
@@ -29,7 +34,7 @@ import {
VideoStreamInfo, VideoStreamInfo,
} from 'src/types'; } from 'src/types';
import { getAssetFiles } from 'src/utils/asset.util'; import { getAssetFiles } from 'src/utils/asset.util';
import { ThumbnailConfig } from 'src/utils/media'; import { BaseConfig, ThumbnailConfig } from 'src/utils/media';
import { mimeTypes } from 'src/utils/mime-types'; import { mimeTypes } from 'src/utils/mime-types';
@Injectable() @Injectable()
@@ -44,25 +49,26 @@ export class MediaService extends BaseService {
@OnJob({ name: JobName.QUEUE_GENERATE_THUMBNAILS, queue: QueueName.THUMBNAIL_GENERATION }) @OnJob({ name: JobName.QUEUE_GENERATE_THUMBNAILS, queue: QueueName.THUMBNAIL_GENERATION })
async handleQueueGenerateThumbnails({ force }: JobOf<JobName.QUEUE_GENERATE_THUMBNAILS>): Promise<JobStatus> { async handleQueueGenerateThumbnails({ force }: JobOf<JobName.QUEUE_GENERATE_THUMBNAILS>): Promise<JobStatus> {
for (let i = 0; i < 10; i++) { let jobs: JobItem[] = [];
let thumbJobs: JobItem[] = [];
for await (const asset of this.assetJobRepository.streamForThumbnailJob(!!force)) {
const { previewFile, thumbnailFile } = getAssetFiles(asset.files);
if (!previewFile || !thumbnailFile || !asset.thumbhash || force) { const queueAll = async () => {
thumbJobs.push({ name: JobName.GENERATE_THUMBNAILS, data: { id: asset.id } }); await this.jobRepository.queueAll(jobs);
continue; jobs = [];
} };
if (thumbJobs.length >= JOBS_ASSET_PAGINATION_SIZE) { for await (const asset of this.assetJobRepository.streamForThumbnailJob(!!force)) {
await this.jobRepository.queueAll(thumbJobs); const { previewFile, thumbnailFile } = getAssetFiles(asset.files);
thumbJobs = [];
} if (!previewFile || !thumbnailFile || !asset.thumbhash || force) {
jobs.push({ name: JobName.GENERATE_THUMBNAILS, data: { id: asset.id } });
}
if (jobs.length >= JOBS_ASSET_PAGINATION_SIZE) {
await queueAll();
} }
await this.jobRepository.queueAll(thumbJobs);
} }
const jobs: JobItem[] = []; await queueAll();
const people = this.personRepository.getAll(force ? undefined : { thumbnailPath: '' }); const people = this.personRepository.getAll(force ? undefined : { thumbnailPath: '' });
@@ -77,9 +83,12 @@ export class MediaService extends BaseService {
} }
jobs.push({ name: JobName.GENERATE_PERSON_THUMBNAIL, data: { id: person.id } }); jobs.push({ name: JobName.GENERATE_PERSON_THUMBNAIL, data: { id: person.id } });
if (jobs.length >= JOBS_ASSET_PAGINATION_SIZE) {
await queueAll();
}
} }
await this.jobRepository.queueAll(jobs); await queueAll();
return JobStatus.SUCCESS; return JobStatus.SUCCESS;
} }
@@ -142,6 +151,75 @@ export class MediaService extends BaseService {
this.logger.warn(`Thumbnail generation failed for asset ${id}: not found`); this.logger.warn(`Thumbnail generation failed for asset ${id}: not found`);
return JobStatus.FAILED; return JobStatus.FAILED;
} }
if (!asset.isVisible) {
this.logger.verbose(`Thumbnail generation skipped for asset ${id}: not visible`);
return JobStatus.SKIPPED;
}
let generated: {
previewPath: string;
thumbnailPath: string;
fullsizePath?: string;
thumbhash: Buffer;
};
if (asset.type === AssetType.VIDEO || asset.originalFileName.toLowerCase().endsWith('.gif')) {
generated = await this.generateVideoThumbnails(asset);
} else if (asset.type === AssetType.IMAGE) {
generated = await this.generateImageThumbnails(asset);
} else {
this.logger.warn(`Skipping thumbnail generation for asset ${id}: ${asset.type} is not an image or video`);
return JobStatus.SKIPPED;
}
const { previewFile, thumbnailFile, fullsizeFile } = getAssetFiles(asset.files);
const toUpsert: UpsertFileOptions[] = [];
if (previewFile?.path !== generated.previewPath) {
toUpsert.push({ assetId: asset.id, path: generated.previewPath, type: AssetFileType.PREVIEW });
}
if (thumbnailFile?.path !== generated.thumbnailPath) {
toUpsert.push({ assetId: asset.id, path: generated.thumbnailPath, type: AssetFileType.THUMBNAIL });
}
if (generated.fullsizePath && fullsizeFile?.path !== generated.fullsizePath) {
toUpsert.push({ assetId: asset.id, path: generated.fullsizePath, type: AssetFileType.FULLSIZE });
}
if (toUpsert.length > 0) {
await this.assetRepository.upsertFiles(toUpsert);
}
const pathsToDelete: string[] = [];
if (previewFile && previewFile.path !== generated.previewPath) {
this.logger.debug(`Deleting old preview for asset ${asset.id}`);
pathsToDelete.push(previewFile.path);
}
if (thumbnailFile && thumbnailFile.path !== generated.thumbnailPath) {
this.logger.debug(`Deleting old thumbnail for asset ${asset.id}`);
pathsToDelete.push(thumbnailFile.path);
}
if (fullsizeFile && fullsizeFile.path !== generated.fullsizePath) {
this.logger.debug(`Deleting old fullsize preview image for asset ${asset.id}`);
pathsToDelete.push(fullsizeFile.path);
if (!generated.fullsizePath) {
// did not generate a new fullsize image, delete the existing record
await this.assetRepository.deleteFiles([fullsizeFile]);
}
}
if (pathsToDelete.length > 0) {
await Promise.all(pathsToDelete.map((path) => this.storageRepository.unlink(path)));
}
if (!asset.thumbhash || Buffer.compare(asset.thumbhash, generated.thumbhash) !== 0) {
await this.assetRepository.update({ id: asset.id, thumbhash: generated.thumbhash });
}
await this.assetRepository.upsertJobStatus({ assetId: asset.id, previewAt: new Date(), thumbnailAt: new Date() });
return JobStatus.SUCCESS; return JobStatus.SUCCESS;
} }
@@ -286,6 +364,81 @@ export class MediaService extends BaseService {
if (!asset) { if (!asset) {
return JobStatus.FAILED; return JobStatus.FAILED;
} }
const input = asset.originalPath;
const output = StorageCore.getEncodedVideoPath(asset);
this.storageCore.ensureFolders(output);
const { videoStreams, audioStreams, format } = await this.mediaRepository.probe(input, {
countFrames: this.logger.isLevelEnabled(LogLevel.DEBUG), // makes frame count more reliable for progress logs
});
const videoStream = this.getMainStream(videoStreams);
const audioStream = this.getMainStream(audioStreams);
if (!videoStream || !format.formatName) {
return JobStatus.FAILED;
}
if (!videoStream.height || !videoStream.width) {
this.logger.warn(`Skipped transcoding for asset ${asset.id}: no video streams found`);
return JobStatus.FAILED;
}
let { ffmpeg } = await this.getConfig({ withCache: true });
const target = this.getTranscodeTarget(ffmpeg, videoStream, audioStream);
if (target === TranscodeTarget.NONE && !this.isRemuxRequired(ffmpeg, format)) {
if (asset.encodedVideoPath) {
this.logger.log(`Transcoded video exists for asset ${asset.id}, but is no longer required. Deleting...`);
await this.jobRepository.queue({ name: JobName.DELETE_FILES, data: { files: [asset.encodedVideoPath] } });
await this.assetRepository.update({ id: asset.id, encodedVideoPath: null });
} else {
this.logger.verbose(`Asset ${asset.id} does not require transcoding based on current policy, skipping`);
}
return JobStatus.SKIPPED;
}
const command = BaseConfig.create(ffmpeg, this.videoInterfaces).getCommand(target, videoStream, audioStream);
if (ffmpeg.accel === TranscodeHWAccel.DISABLED) {
this.logger.log(`Transcoding video ${asset.id} without hardware acceleration`);
} else {
this.logger.log(
`Transcoding video ${asset.id} with ${ffmpeg.accel.toUpperCase()}-accelerated encoding and${ffmpeg.accelDecode ? '' : ' software'} decoding`,
);
}
try {
await this.mediaRepository.transcode(input, output, command);
} catch (error: any) {
this.logger.error(`Error occurred during transcoding: ${error.message}`);
if (ffmpeg.accel === TranscodeHWAccel.DISABLED) {
return JobStatus.FAILED;
}
let partialFallbackSuccess = false;
if (ffmpeg.accelDecode) {
try {
this.logger.error(`Retrying with ${ffmpeg.accel.toUpperCase()}-accelerated encoding and software decoding`);
ffmpeg = { ...ffmpeg, accelDecode: false };
const command = BaseConfig.create(ffmpeg, this.videoInterfaces).getCommand(target, videoStream, audioStream);
await this.mediaRepository.transcode(input, output, command);
partialFallbackSuccess = true;
} catch (error: any) {
this.logger.error(`Error occurred during transcoding: ${error.message}`);
}
}
if (!partialFallbackSuccess) {
this.logger.error(`Retrying with ${ffmpeg.accel.toUpperCase()} acceleration disabled`);
ffmpeg = { ...ffmpeg, accel: TranscodeHWAccel.DISABLED };
const command = BaseConfig.create(ffmpeg, this.videoInterfaces).getCommand(target, videoStream, audioStream);
await this.mediaRepository.transcode(input, output, command);
}
}
this.logger.log(`Successfully encoded ${asset.id}`);
await this.assetRepository.update({ id: asset.id, encodedVideoPath: output });
return JobStatus.SUCCESS; return JobStatus.SUCCESS;
} }

View File

@@ -23,9 +23,11 @@ import {
SourceType, SourceType,
} from 'src/enum'; } from 'src/enum';
import { ArgOf } from 'src/repositories/event.repository'; import { ArgOf } from 'src/repositories/event.repository';
import { ReverseGeocodeResult } from 'src/repositories/map.repository';
import { ImmichTags } from 'src/repositories/metadata.repository'; import { ImmichTags } from 'src/repositories/metadata.repository';
import { BaseService } from 'src/services/base.service'; import { BaseService } from 'src/services/base.service';
import { JobItem, JobOf } from 'src/types'; import { JobItem, JobOf } from 'src/types';
import { isFaceImportEnabled } from 'src/utils/misc';
import { upsertTags } from 'src/utils/tag'; import { upsertTags } from 'src/utils/tag';
/** look for a date from these tags (in order) */ /** look for a date from these tags (in order) */
@@ -163,20 +165,17 @@ export class MetadataService extends BaseService {
async handleQueueMetadataExtraction(job: JobOf<JobName.QUEUE_METADATA_EXTRACTION>): Promise<JobStatus> { async handleQueueMetadataExtraction(job: JobOf<JobName.QUEUE_METADATA_EXTRACTION>): Promise<JobStatus> {
const { force } = job; const { force } = job;
for (let i = 0; i < 10; i++) { let queue: { name: JobName.METADATA_EXTRACTION; data: { id: string } }[] = [];
let queue: { name: JobName.METADATA_EXTRACTION; data: { id: string } }[] = []; for await (const asset of this.assetJobRepository.streamForMetadataExtraction(force)) {
for await (const asset of this.assetJobRepository.streamForMetadataExtraction(force)) { queue.push({ name: JobName.METADATA_EXTRACTION, data: { id: asset.id } });
queue.push({ name: JobName.METADATA_EXTRACTION, data: { id: asset.id, source: 'upload' } as any });
if (queue.length >= JOBS_ASSET_PAGINATION_SIZE) { if (queue.length >= JOBS_ASSET_PAGINATION_SIZE) {
await this.jobRepository.queueAll(queue); await this.jobRepository.queueAll(queue);
queue = []; queue = [];
}
} }
await this.jobRepository.queueAll(queue);
} }
await this.jobRepository.queueAll(queue);
return JobStatus.SUCCESS; return JobStatus.SUCCESS;
} }
@@ -190,6 +189,98 @@ export class MetadataService extends BaseService {
if (!asset) { if (!asset) {
return JobStatus.FAILED; return JobStatus.FAILED;
} }
const [exifTags, stats] = await Promise.all([
this.getExifTags(asset),
this.storageRepository.stat(asset.originalPath),
]);
this.logger.verbose('Exif Tags', exifTags);
const dates = this.getDates(asset, exifTags, stats);
const { width, height } = this.getImageDimensions(exifTags);
let geo: ReverseGeocodeResult = { country: null, state: null, city: null },
latitude: number | null = null,
longitude: number | null = null;
if (this.hasGeo(exifTags)) {
latitude = exifTags.GPSLatitude;
longitude = exifTags.GPSLongitude;
if (reverseGeocoding.enabled) {
geo = await this.mapRepository.reverseGeocode({ latitude, longitude });
}
}
const exifData: Insertable<Exif> = {
assetId: asset.id,
// dates
dateTimeOriginal: dates.dateTimeOriginal,
modifyDate: stats.mtime,
timeZone: dates.timeZone,
// gps
latitude,
longitude,
country: geo.country,
state: geo.state,
city: geo.city,
// image/file
fileSizeInByte: stats.size,
exifImageHeight: validate(height),
exifImageWidth: validate(width),
orientation: validate(exifTags.Orientation)?.toString() ?? null,
projectionType: exifTags.ProjectionType ? String(exifTags.ProjectionType).toUpperCase() : null,
bitsPerSample: this.getBitsPerSample(exifTags),
colorspace: exifTags.ColorSpace ?? null,
// camera
make: exifTags.Make ?? exifTags?.Device?.Manufacturer ?? exifTags.AndroidMake ?? null,
model: exifTags.Model ?? exifTags?.Device?.ModelName ?? exifTags.AndroidModel ?? null,
fps: validate(Number.parseFloat(exifTags.VideoFrameRate!)),
iso: validate(exifTags.ISO) as number,
exposureTime: exifTags.ExposureTime ?? null,
lensModel: getLensModel(exifTags),
fNumber: validate(exifTags.FNumber),
focalLength: validate(exifTags.FocalLength),
// comments
description: String(exifTags.ImageDescription || exifTags.Description || '').trim(),
profileDescription: exifTags.ProfileDescription || null,
rating: validateRange(exifTags.Rating, -1, 5),
// grouping
livePhotoCID: (exifTags.ContentIdentifier || exifTags.MediaGroupUUID) ?? null,
autoStackId: this.getAutoStackId(exifTags),
};
const promises: Promise<unknown>[] = [
this.assetRepository.upsertExif(exifData),
this.assetRepository.update({
id: asset.id,
duration: exifTags.Duration?.toString() ?? null,
localDateTime: dates.localDateTime,
fileCreatedAt: dates.dateTimeOriginal ?? undefined,
fileModifiedAt: stats.mtime,
}),
this.applyTagList(asset, exifTags),
];
if (this.isMotionPhoto(asset, exifTags)) {
promises.push(this.applyMotionPhotos(asset, exifTags, dates, stats));
}
if (isFaceImportEnabled(metadata) && this.hasTaggedFaces(exifTags)) {
promises.push(this.applyTaggedFaces(asset, exifTags));
}
await Promise.all(promises);
if (exifData.livePhotoCID) {
await this.linkLivePhotos(asset, exifData);
}
await this.assetRepository.upsertJobStatus({ assetId: asset.id, metadataExtractedAt: new Date() });
return JobStatus.SUCCESS; return JobStatus.SUCCESS;
} }
@@ -334,9 +425,9 @@ export class MetadataService extends BaseService {
typeof tag === 'number' typeof tag === 'number'
? String(tag) ? String(tag)
: tag : tag
.split('|') .split('|')
.map((tag) => tag.replaceAll('/', '|')) .map((tag) => tag.replaceAll('/', '|'))
.join('/'), .join('/'),
); );
} else if (exifTags.Keywords) { } else if (exifTags.Keywords) {
let keywords = exifTags.Keywords; let keywords = exifTags.Keywords;

View File

@@ -1,7 +1,9 @@
import { BadRequestException, Injectable, NotFoundException } from '@nestjs/common'; import { BadRequestException, Injectable, NotFoundException } from '@nestjs/common';
import { Updateable } from 'kysely'; import { Insertable, Updateable } from 'kysely';
import { JOBS_ASSET_PAGINATION_SIZE } from 'src/constants'; import { FACE_THUMBNAIL_SIZE, JOBS_ASSET_PAGINATION_SIZE } from 'src/constants';
import { StorageCore } from 'src/cores/storage.core';
import { Person } from 'src/database'; import { Person } from 'src/database';
import { AssetFaces, FaceSearch } from 'src/db';
import { Chunked, OnJob } from 'src/decorators'; import { Chunked, OnJob } from 'src/decorators';
import { BulkIdErrorReason, BulkIdResponseDto } from 'src/dtos/asset-ids.response.dto'; import { BulkIdErrorReason, BulkIdResponseDto } from 'src/dtos/asset-ids.response.dto';
import { AuthDto } from 'src/dtos/auth.dto'; import { AuthDto } from 'src/dtos/auth.dto';
@@ -25,6 +27,7 @@ import {
import { import {
AssetType, AssetType,
CacheControl, CacheControl,
ImageFormat,
JobName, JobName,
JobStatus, JobStatus,
Permission, Permission,
@@ -288,6 +291,79 @@ export class PersonService extends BaseService {
} }
const asset = await this.assetJobRepository.getForDetectFacesJob(id); const asset = await this.assetJobRepository.getForDetectFacesJob(id);
const previewFile = asset?.files[0];
if (!asset || asset.files.length !== 1 || !previewFile) {
return JobStatus.FAILED;
}
if (!asset.isVisible) {
return JobStatus.SKIPPED;
}
const { imageHeight, imageWidth, faces } = await this.machineLearningRepository.detectFaces(
machineLearning.urls,
previewFile.path,
machineLearning.facialRecognition,
);
this.logger.debug(`${faces.length} faces detected in ${previewFile.path}`);
const facesToAdd: (Insertable<AssetFaces> & { id: string })[] = [];
const embeddings: FaceSearch[] = [];
const mlFaceIds = new Set<string>();
for (const face of asset.faces) {
if (face.sourceType === SourceType.MACHINE_LEARNING) {
mlFaceIds.add(face.id);
}
}
const heightScale = imageHeight / (asset.faces[0]?.imageHeight || 1);
const widthScale = imageWidth / (asset.faces[0]?.imageWidth || 1);
for (const { boundingBox, embedding } of faces) {
const scaledBox = {
x1: boundingBox.x1 * widthScale,
y1: boundingBox.y1 * heightScale,
x2: boundingBox.x2 * widthScale,
y2: boundingBox.y2 * heightScale,
};
const match = asset.faces.find((face) => this.iou(face, scaledBox) > 0.5);
if (match && !mlFaceIds.delete(match.id)) {
embeddings.push({ faceId: match.id, embedding });
} else if (!match) {
const faceId = this.cryptoRepository.randomUUID();
facesToAdd.push({
id: faceId,
assetId: asset.id,
imageHeight,
imageWidth,
boundingBoxX1: boundingBox.x1,
boundingBoxY1: boundingBox.y1,
boundingBoxX2: boundingBox.x2,
boundingBoxY2: boundingBox.y2,
});
embeddings.push({ faceId, embedding });
}
}
const faceIdsToRemove = [...mlFaceIds];
if (facesToAdd.length > 0 || faceIdsToRemove.length > 0 || embeddings.length > 0) {
await this.personRepository.refreshFaces(facesToAdd, faceIdsToRemove, embeddings);
}
if (faceIdsToRemove.length > 0) {
this.logger.log(`Removed ${faceIdsToRemove.length} faces below detection threshold in asset ${id}`);
}
if (facesToAdd.length > 0) {
this.logger.log(`Detected ${facesToAdd.length} new faces in asset ${id}`);
const jobs = facesToAdd.map((face) => ({ name: JobName.FACIAL_RECOGNITION, data: { id: face.id } }) as const);
await this.jobRepository.queueAll([{ name: JobName.QUEUE_FACIAL_RECOGNITION, data: { force: false } }, ...jobs]);
} else if (embeddings.length > 0) {
this.logger.log(`Added ${embeddings.length} face embeddings for asset ${id}`);
}
await this.assetRepository.upsertJobStatus({ assetId: asset.id, facesRecognizedAt: new Date() });
return JobStatus.SUCCESS; return JobStatus.SUCCESS;
} }
@@ -471,6 +547,26 @@ export class PersonService extends BaseService {
this.logger.error(`Could not generate person thumbnail for ${id}: missing data`); this.logger.error(`Could not generate person thumbnail for ${id}: missing data`);
return JobStatus.FAILED; return JobStatus.FAILED;
} }
const { ownerId, x1, y1, x2, y2, oldWidth, oldHeight } = data;
const { width, height, inputPath } = await this.getInputDimensions(data);
const thumbnailPath = StorageCore.getPersonThumbnailPath({ id, ownerId });
this.storageCore.ensureFolders(thumbnailPath);
const thumbnailOptions = {
colorspace: image.colorspace,
format: ImageFormat.JPEG,
size: FACE_THUMBNAIL_SIZE,
quality: image.thumbnail.quality,
crop: this.getCrop({ old: { width: oldWidth, height: oldHeight }, new: { width, height } }, { x1, y1, x2, y2 }),
processInvalidImages: process.env.IMMICH_PROCESS_INVALID_IMAGES === 'true',
};
await this.mediaRepository.generateThumbnail(inputPath, thumbnailOptions, thumbnailPath);
await this.personRepository.update({ id, thumbnailPath });
return JobStatus.SUCCESS; return JobStatus.SUCCESS;
} }

View File

@@ -103,6 +103,30 @@ export class SmartInfoService extends BaseService {
if (!asset || asset.files.length !== 1) { if (!asset || asset.files.length !== 1) {
return JobStatus.FAILED; return JobStatus.FAILED;
} }
if (!asset.isVisible) {
return JobStatus.SKIPPED;
}
const embedding = await this.machineLearningRepository.encodeImage(
machineLearning.urls,
asset.files[0].path,
machineLearning.clip,
);
if (this.databaseRepository.isBusy(DatabaseLock.CLIPDimSize)) {
this.logger.verbose(`Waiting for CLIP dimension size to be updated`);
await this.databaseRepository.wait(DatabaseLock.CLIPDimSize);
}
const newConfig = await this.getConfig({ withCache: true });
if (machineLearning.clip.modelName !== newConfig.machineLearning.clip.modelName) {
// Skip the job if the the model has changed since the embedding was generated.
return JobStatus.SKIPPED;
}
await this.searchRepository.upsert(asset.id, embedding);
return JobStatus.SUCCESS; return JobStatus.SUCCESS;
} }
} }

View File

@@ -10,6 +10,7 @@ import { AssetPathType, AssetType, DatabaseLock, JobName, JobStatus, QueueName,
import { ArgOf } from 'src/repositories/event.repository'; import { ArgOf } from 'src/repositories/event.repository';
import { BaseService } from 'src/services/base.service'; import { BaseService } from 'src/services/base.service';
import { JobOf, StorageAsset } from 'src/types'; import { JobOf, StorageAsset } from 'src/types';
import { getLivePhotoMotionFilename } from 'src/utils/file';
const storageTokens = { const storageTokens = {
secondOptions: ['s', 'ss', 'SSS'], secondOptions: ['s', 'ss', 'SSS'],
@@ -127,6 +128,21 @@ export class StorageTemplateService extends BaseService {
if (!asset) { if (!asset) {
return JobStatus.FAILED; return JobStatus.FAILED;
} }
const user = await this.userRepository.get(asset.ownerId, {});
const storageLabel = user?.storageLabel || null;
const filename = asset.originalFileName || asset.id;
await this.moveAsset(asset, { storageLabel, filename });
// move motion part of live photo
if (asset.livePhotoVideoId) {
const livePhotoVideo = await this.assetJobRepository.getForStorageTemplateJob(asset.livePhotoVideoId);
if (!livePhotoVideo) {
return JobStatus.FAILED;
}
const motionFilename = getLivePhotoMotionFilename(filename, livePhotoVideo.originalPath);
await this.moveAsset(livePhotoVideo, { storageLabel, filename: motionFilename });
}
return JobStatus.SUCCESS; return JobStatus.SUCCESS;
} }