feat(server): library refresh go brrr (#14456)

* feat: brr

---------
Co-authored-by: mertalev <101130780+mertalev@users.noreply.github.com>
This commit is contained in:
Jonathan Jogenfors
2025-03-06 16:00:18 +01:00
committed by GitHub
parent bc61497461
commit 3af26ee94a
15 changed files with 855 additions and 531 deletions
+309 -185
View File
@@ -1,5 +1,6 @@
import { BadRequestException, Injectable } from '@nestjs/common';
import { R_OK } from 'node:constants';
import { Stats } from 'node:fs';
import path, { basename, isAbsolute, parse } from 'node:path';
import picomatch from 'picomatch';
import { JOBS_LIBRARY_PAGINATION_SIZE } from 'src/constants';
@@ -16,9 +17,9 @@ import {
ValidateLibraryResponseDto,
} from 'src/dtos/library.dto';
import { AssetEntity } from 'src/entities/asset.entity';
import { LibraryEntity } from 'src/entities/library.entity';
import { AssetType, DatabaseLock, ImmichWorker, JobName, JobStatus, QueueName } from 'src/enum';
import { AssetStatus, AssetType, DatabaseLock, ImmichWorker, JobName, JobStatus, QueueName } from 'src/enum';
import { ArgOf } from 'src/repositories/event.repository';
import { AssetSyncResult } from 'src/repositories/library.repository';
import { BaseService } from 'src/services/base.service';
import { JobOf } from 'src/types';
import { mimeTypes } from 'src/utils/mime-types';
@@ -98,6 +99,26 @@ export class LibraryService extends BaseService {
let _resolve: () => void;
const ready$ = new Promise<void>((resolve) => (_resolve = resolve));
const handler = async (event: string, path: string) => {
if (matcher(path)) {
this.logger.debug(`File ${event} event received for ${path} in library ${library.id}}`);
await this.jobRepository.queue({
name: JobName.LIBRARY_SYNC_FILES,
data: { libraryId: library.id, paths: [path] },
});
} else {
this.logger.verbose(`Ignoring file ${event} event for ${path} in library ${library.id}`);
}
};
const deletionHandler = async (path: string) => {
this.logger.debug(`File unlink event received for ${path} in library ${library.id}}`);
await this.jobRepository.queue({
name: JobName.LIBRARY_ASSET_REMOVAL,
data: { libraryId: library.id, paths: [path] },
});
};
this.watchers[id] = this.storageRepository.watch(
library.importPaths,
{
@@ -107,43 +128,13 @@ export class LibraryService extends BaseService {
{
onReady: () => _resolve(),
onAdd: (path) => {
const handler = async () => {
this.logger.debug(`File add event received for ${path} in library ${library.id}}`);
if (matcher(path)) {
const asset = await this.assetRepository.getByLibraryIdAndOriginalPath(library.id, path);
if (asset) {
await this.syncAssets(library, [asset.id]);
}
if (matcher(path)) {
await this.syncFiles(library, [path]);
}
}
};
return handlePromiseError(handler(), this.logger);
return handlePromiseError(handler('add', path), this.logger);
},
onChange: (path) => {
const handler = async () => {
this.logger.debug(`Detected file change for ${path} in library ${library.id}`);
const asset = await this.assetRepository.getByLibraryIdAndOriginalPath(library.id, path);
if (asset) {
await this.syncAssets(library, [asset.id]);
}
if (matcher(path)) {
// Note: if the changed file was not previously imported, it will be imported now.
await this.syncFiles(library, [path]);
}
};
return handlePromiseError(handler(), this.logger);
return handlePromiseError(handler('change', path), this.logger);
},
onUnlink: (path) => {
const handler = async () => {
this.logger.debug(`Detected deleted file at ${path} in library ${library.id}`);
const asset = await this.assetRepository.getByLibraryIdAndOriginalPath(library.id, path);
if (asset) {
await this.syncAssets(library, [asset.id]);
}
};
return handlePromiseError(handler(), this.logger);
return handlePromiseError(deletionHandler(path), this.logger);
},
onError: (error) => {
this.logger.error(`Library watcher for library ${library.id} encountered error: ${error}`);
@@ -234,26 +225,38 @@ export class LibraryService extends BaseService {
return mapLibrary(library);
}
private async syncFiles({ id, ownerId }: LibraryEntity, assetPaths: string[]) {
await this.jobRepository.queueAll(
assetPaths.map((assetPath) => ({
name: JobName.LIBRARY_SYNC_FILE,
data: {
id,
assetPath,
ownerId,
},
})),
);
}
@OnJob({ name: JobName.LIBRARY_SYNC_FILES, queue: QueueName.LIBRARY })
async handleSyncFiles(job: JobOf<JobName.LIBRARY_SYNC_FILES>): Promise<JobStatus> {
const library = await this.libraryRepository.get(job.libraryId);
// We need to check if the library still exists as it could have been deleted after the scan was queued
if (!library) {
this.logger.debug(`Library ${job.libraryId} not found, skipping file import`);
return JobStatus.FAILED;
} else if (library.deletedAt) {
this.logger.debug(`Library ${job.libraryId} is deleted, won't import assets into it`);
return JobStatus.FAILED;
}
private async syncAssets({ importPaths, exclusionPatterns }: LibraryEntity, assetIds: string[]) {
await this.jobRepository.queueAll(
assetIds.map((assetId) => ({
name: JobName.LIBRARY_SYNC_ASSET,
data: { id: assetId, importPaths, exclusionPatterns },
})),
);
const assetImports = job.paths.map((assetPath) => this.processEntity(assetPath, library.ownerId, job.libraryId));
const assetIds: string[] = [];
for (let i = 0; i < assetImports.length; i += 5000) {
// Chunk the imports to avoid the postgres limit of max parameters at once
const chunk = assetImports.slice(i, i + 5000);
await this.assetRepository.createAll(chunk).then((assets) => assetIds.push(...assets.map((asset) => asset.id)));
}
const progressMessage =
job.progressCounter && job.totalAssets
? `(${job.progressCounter} of ${job.totalAssets})`
: `(${job.progressCounter} done so far)`;
this.logger.log(`Imported ${assetIds.length} ${progressMessage} file(s) into library ${job.libraryId}`);
await this.queuePostSyncJobs(assetIds);
return JobStatus.SUCCESS;
}
private async validateImportPath(importPath: string): Promise<ValidateLibraryImportPathResponseDto> {
@@ -336,6 +339,8 @@ export class LibraryService extends BaseService {
async handleDeleteLibrary(job: JobOf<JobName.LIBRARY_DELETE>): Promise<JobStatus> {
const libraryId = job.id;
await this.assetRepository.updateByLibraryId(libraryId, { deletedAt: new Date() });
const assetPagination = usePagination(JOBS_LIBRARY_PAGINATION_SIZE, (pagination) =>
this.assetRepository.getAll(pagination, { libraryId, withDeleted: true }),
);
@@ -367,84 +372,52 @@ export class LibraryService extends BaseService {
return JobStatus.SUCCESS;
}
@OnJob({ name: JobName.LIBRARY_SYNC_FILE, queue: QueueName.LIBRARY })
async handleSyncFile(job: JobOf<JobName.LIBRARY_SYNC_FILE>): Promise<JobStatus> {
// Only needs to handle new assets
const assetPath = path.normalize(job.assetPath);
private processEntity(filePath: string, ownerId: string, libraryId: string) {
const assetPath = path.normalize(filePath);
let asset = await this.assetRepository.getByLibraryIdAndOriginalPath(job.id, assetPath);
if (asset) {
return JobStatus.SKIPPED;
}
let stat;
try {
stat = await this.storageRepository.stat(assetPath);
} catch (error: any) {
if (error.code === 'ENOENT') {
this.logger.error(`File not found: ${assetPath}`);
return JobStatus.SKIPPED;
}
this.logger.error(`Error reading file: ${assetPath}. Error: ${error}`);
return JobStatus.FAILED;
}
this.logger.log(`Importing new library asset: ${assetPath}`);
const library = await this.libraryRepository.get(job.id, true);
if (!library || library.deletedAt) {
this.logger.error('Cannot import asset into deleted library');
return JobStatus.FAILED;
}
// TODO: device asset id is deprecated, remove it
const deviceAssetId = `${basename(assetPath)}`.replaceAll(/\s+/g, '');
const pathHash = this.cryptoRepository.hashSha1(`path:${assetPath}`);
const assetType = mimeTypes.isVideo(assetPath) ? AssetType.VIDEO : AssetType.IMAGE;
const mtime = stat.mtime;
asset = await this.assetRepository.create({
ownerId: job.ownerId,
libraryId: job.id,
checksum: pathHash,
return {
ownerId,
libraryId,
checksum: this.cryptoRepository.hashSha1(`path:${assetPath}`),
originalPath: assetPath,
deviceAssetId,
fileCreatedAt: null,
fileModifiedAt: null,
localDateTime: null,
// TODO: device asset id is deprecated, remove it
deviceAssetId: `${basename(assetPath)}`.replaceAll(/\s+/g, ''),
deviceId: 'Library Import',
fileCreatedAt: mtime,
fileModifiedAt: mtime,
localDateTime: mtime,
type: assetType,
type: mimeTypes.isVideo(assetPath) ? AssetType.VIDEO : AssetType.IMAGE,
originalFileName: parse(assetPath).base,
isExternal: true,
});
await this.queuePostSyncJobs(asset);
return JobStatus.SUCCESS;
livePhotoVideoId: null,
};
}
async queuePostSyncJobs(asset: AssetEntity) {
this.logger.debug(`Queueing metadata extraction for: ${asset.originalPath}`);
async queuePostSyncJobs(assetIds: string[]) {
this.logger.debug(`Queuing sidecar discovery for ${assetIds.length} asset(s)`);
// We queue a sidecar discovery which, in turn, queues metadata extraction
await this.jobRepository.queue({
name: JobName.SIDECAR_DISCOVERY,
data: { id: asset.id, source: 'upload' },
});
await this.jobRepository.queueAll(
assetIds.map((assetId) => ({
name: JobName.SIDECAR_DISCOVERY,
data: { id: assetId, source: 'upload' },
})),
);
}
async queueScan(id: string) {
await this.findOrFail(id);
this.logger.log(`Starting to scan library ${id}`);
await this.jobRepository.queue({
name: JobName.LIBRARY_QUEUE_SYNC_FILES,
data: {
id,
},
});
await this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_SYNC_ASSETS, data: { id } });
}
@@ -454,11 +427,12 @@ export class LibraryService extends BaseService {
@OnJob({ name: JobName.LIBRARY_QUEUE_SCAN_ALL, queue: QueueName.LIBRARY })
async handleQueueScanAll(): Promise<JobStatus> {
this.logger.log(`Refreshing all external libraries`);
this.logger.log(`Initiating scan of all external libraries...`);
await this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_CLEANUP, data: {} });
const libraries = await this.libraryRepository.getAll(true);
await this.jobRepository.queueAll(
libraries.map((library) => ({
name: JobName.LIBRARY_QUEUE_SYNC_FILES,
@@ -475,64 +449,141 @@ export class LibraryService extends BaseService {
},
})),
);
return JobStatus.SUCCESS;
}
@OnJob({ name: JobName.LIBRARY_SYNC_ASSET, queue: QueueName.LIBRARY })
async handleSyncAsset(job: JobOf<JobName.LIBRARY_SYNC_ASSET>): Promise<JobStatus> {
const asset = await this.assetRepository.getById(job.id);
if (!asset) {
return JobStatus.SKIPPED;
}
@OnJob({ name: JobName.LIBRARY_SYNC_ASSETS, queue: QueueName.LIBRARY })
async handleSyncAssets(job: JobOf<JobName.LIBRARY_SYNC_ASSETS>): Promise<JobStatus> {
const assets = await this.assetRepository.getByIds(job.assetIds);
const markOffline = async (explanation: string) => {
if (!asset.isOffline) {
this.logger.debug(`${explanation}, removing: ${asset.originalPath}`);
await this.assetRepository.updateAll([asset.id], { isOffline: true, deletedAt: new Date() });
const assetIdsToOffline: string[] = [];
const trashedAssetIdsToOffline: string[] = [];
const assetIdsToOnline: string[] = [];
const trashedAssetIdsToOnline: string[] = [];
const assetIdsToUpdate: string[] = [];
this.logger.debug(`Checking batch of ${assets.length} existing asset(s) in library ${job.libraryId}`);
const stats = await Promise.all(
assets.map((asset) => this.storageRepository.stat(asset.originalPath).catch(() => null)),
);
for (let i = 0; i < assets.length; i++) {
const asset = assets[i];
const stat = stats[i];
const action = this.checkExistingAsset(asset, stat);
switch (action) {
case AssetSyncResult.OFFLINE: {
if (asset.status === AssetStatus.TRASHED) {
trashedAssetIdsToOffline.push(asset.id);
} else {
assetIdsToOffline.push(asset.id);
}
break;
}
case AssetSyncResult.UPDATE: {
assetIdsToUpdate.push(asset.id);
break;
}
case AssetSyncResult.CHECK_OFFLINE: {
const isInImportPath = job.importPaths.find((path) => asset.originalPath.startsWith(path));
if (!isInImportPath) {
this.logger.verbose(
`Offline asset ${asset.originalPath} is still not in any import path, keeping offline in library ${job.libraryId}`,
);
break;
}
const isExcluded = job.exclusionPatterns.some((pattern) => picomatch.isMatch(asset.originalPath, pattern));
if (!isExcluded) {
this.logger.debug(`Offline asset ${asset.originalPath} is now online in library ${job.libraryId}`);
if (asset.status === AssetStatus.TRASHED) {
trashedAssetIdsToOnline.push(asset.id);
} else {
assetIdsToOnline.push(asset.id);
}
break;
}
this.logger.verbose(
`Offline asset ${asset.originalPath} is in an import path but still covered by exclusion pattern, keeping offline in library ${job.libraryId}`,
);
break;
}
}
};
const isInPath = job.importPaths.find((path) => asset.originalPath.startsWith(path));
if (!isInPath) {
await markOffline('Asset is no longer in an import path');
return JobStatus.SUCCESS;
}
const isExcluded = job.exclusionPatterns.some((pattern) => picomatch.isMatch(asset.originalPath, pattern));
if (isExcluded) {
await markOffline('Asset is covered by an exclusion pattern');
return JobStatus.SUCCESS;
const promises = [];
if (assetIdsToOffline.length > 0) {
promises.push(this.assetRepository.updateAll(assetIdsToOffline, { isOffline: true, deletedAt: new Date() }));
}
let stat;
try {
stat = await this.storageRepository.stat(asset.originalPath);
} catch {
await markOffline('Asset is no longer on disk or is inaccessible because of permissions');
return JobStatus.SUCCESS;
if (trashedAssetIdsToOffline.length > 0) {
promises.push(this.assetRepository.updateAll(trashedAssetIdsToOffline, { isOffline: true }));
}
const mtime = stat.mtime;
const isAssetModified = !asset.fileModifiedAt || mtime.toISOString() !== asset.fileModifiedAt.toISOString();
if (asset.isOffline || isAssetModified) {
this.logger.debug(`Asset was offline or modified, updating asset record ${asset.originalPath}`);
//TODO: When we have asset status, we need to leave deletedAt as is when status is trashed
await this.assetRepository.updateAll([asset.id], {
isOffline: false,
deletedAt: null,
fileModifiedAt: mtime,
originalFileName: parse(asset.originalPath).base,
});
if (assetIdsToOnline.length > 0) {
promises.push(this.assetRepository.updateAll(assetIdsToOnline, { isOffline: false, deletedAt: null }));
}
if (isAssetModified) {
this.logger.debug(`Asset was modified, queuing metadata extraction for: ${asset.originalPath}`);
await this.queuePostSyncJobs(asset);
if (trashedAssetIdsToOnline.length > 0) {
promises.push(this.assetRepository.updateAll(trashedAssetIdsToOnline, { isOffline: false }));
}
if (assetIdsToUpdate.length > 0) {
promises.push(this.queuePostSyncJobs(assetIdsToUpdate));
}
await Promise.all(promises);
const remainingCount = assets.length - assetIdsToOffline.length - assetIdsToUpdate.length - assetIdsToOnline.length;
const cumulativePercentage = ((100 * job.progressCounter) / job.totalAssets).toFixed(1);
this.logger.log(
`Checked existing asset(s): ${assetIdsToOffline.length + trashedAssetIdsToOffline.length} offlined, ${assetIdsToOnline.length + trashedAssetIdsToOnline.length} onlined, ${assetIdsToUpdate.length} updated, ${remainingCount} unchanged of current batch of ${assets.length} (Total progress: ${job.progressCounter} of ${job.totalAssets}, ${cumulativePercentage} %) in library ${job.libraryId}.`,
);
return JobStatus.SUCCESS;
}
private checkExistingAsset(asset: AssetEntity, stat: Stats | null): AssetSyncResult {
if (!stat) {
// File not found on disk or permission error
if (asset.isOffline) {
this.logger.verbose(
`Asset ${asset.originalPath} is still not accessible, keeping offline in library ${asset.libraryId}`,
);
return AssetSyncResult.DO_NOTHING;
}
this.logger.debug(
`Asset ${asset.originalPath} is no longer on disk or is inaccessible because of permissions, marking offline in library ${asset.libraryId}`,
);
return AssetSyncResult.OFFLINE;
}
if (asset.isOffline && asset.status !== AssetStatus.DELETED) {
// Only perform the expensive check if the asset is offline
return AssetSyncResult.CHECK_OFFLINE;
}
if (
!asset.fileCreatedAt ||
!asset.localDateTime ||
!asset.fileModifiedAt ||
stat.mtime.valueOf() !== asset.fileModifiedAt.valueOf()
) {
this.logger.verbose(`Asset ${asset.originalPath} needs metadata extraction in library ${asset.libraryId}`);
return AssetSyncResult.UPDATE;
}
return AssetSyncResult.DO_NOTHING;
}
@OnJob({ name: JobName.LIBRARY_QUEUE_SYNC_FILES, queue: QueueName.LIBRARY })
async handleQueueSyncFiles(job: JobOf<JobName.LIBRARY_QUEUE_SYNC_FILES>): Promise<JobStatus> {
const library = await this.libraryRepository.get(job.id);
@@ -541,7 +592,7 @@ export class LibraryService extends BaseService {
return JobStatus.SKIPPED;
}
this.logger.log(`Refreshing library ${library.id} for new assets`);
this.logger.debug(`Validating import paths for library ${library.id}...`);
const validImportPaths: string[] = [];
@@ -556,35 +607,67 @@ export class LibraryService extends BaseService {
if (validImportPaths.length === 0) {
this.logger.warn(`No valid import paths found for library ${library.id}`);
return JobStatus.SKIPPED;
}
const assetsOnDisk = this.storageRepository.walk({
const pathsOnDisk = this.storageRepository.walk({
pathsToCrawl: validImportPaths,
includeHidden: false,
exclusionPatterns: library.exclusionPatterns,
take: JOBS_LIBRARY_PAGINATION_SIZE,
});
let count = 0;
let importCount = 0;
let crawlCount = 0;
for await (const assetBatch of assetsOnDisk) {
count += assetBatch.length;
this.logger.debug(`Discovered ${count} asset(s) on disk for library ${library.id}...`);
await this.syncFiles(library, assetBatch);
this.logger.verbose(`Queued scan of ${assetBatch.length} crawled asset(s) in library ${library.id}...`);
this.logger.log(`Starting disk crawl of ${validImportPaths.length} import path(s) for library ${library.id}...`);
for await (const pathBatch of pathsOnDisk) {
crawlCount += pathBatch.length;
const paths = await this.assetRepository.filterNewExternalAssetPaths(library.id, pathBatch);
if (paths.length > 0) {
importCount += paths.length;
await this.jobRepository.queue({
name: JobName.LIBRARY_SYNC_FILES,
data: {
libraryId: library.id,
paths,
progressCounter: crawlCount,
},
});
}
this.logger.log(
`Crawled ${crawlCount} file(s) so far: ${paths.length} of current batch of ${pathBatch.length} will be imported to library ${library.id}...`,
);
}
if (count > 0) {
this.logger.debug(`Finished queueing scan of ${count} assets on disk for library ${library.id}`);
} else if (validImportPaths.length > 0) {
this.logger.debug(`No non-excluded assets found in any import path for library ${library.id}`);
}
this.logger.log(
`Finished disk crawl, ${crawlCount} file(s) found on disk and queued ${importCount} file(s) for import into ${library.id}`,
);
await this.libraryRepository.update(job.id, { refreshedAt: new Date() });
return JobStatus.SUCCESS;
}
@OnJob({ name: JobName.LIBRARY_ASSET_REMOVAL, queue: QueueName.LIBRARY })
async handleAssetRemoval(job: JobOf<JobName.LIBRARY_ASSET_REMOVAL>): Promise<JobStatus> {
// This is only for handling file unlink events via the file watcher
this.logger.verbose(`Deleting asset(s) ${job.paths} from library ${job.libraryId}`);
for (const assetPath of job.paths) {
const asset = await this.assetRepository.getByLibraryIdAndOriginalPath(job.libraryId, assetPath);
if (asset) {
await this.assetRepository.remove(asset);
}
}
return JobStatus.SUCCESS;
}
@OnJob({ name: JobName.LIBRARY_QUEUE_SYNC_ASSETS, queue: QueueName.LIBRARY })
async handleQueueSyncAssets(job: JobOf<JobName.LIBRARY_QUEUE_SYNC_ASSETS>): Promise<JobStatus> {
const library = await this.libraryRepository.get(job.id);
@@ -592,27 +675,68 @@ export class LibraryService extends BaseService {
return JobStatus.SKIPPED;
}
this.logger.log(`Scanning library ${library.id} for removed assets`);
const assetCount = await this.assetRepository.getLibraryAssetCount(job.id);
const onlineAssets = usePagination(JOBS_LIBRARY_PAGINATION_SIZE, (pagination) =>
this.assetRepository.getAll(pagination, { libraryId: job.id, withDeleted: true }),
);
let assetCount = 0;
for await (const assets of onlineAssets) {
assetCount += assets.length;
this.logger.debug(`Discovered ${assetCount} asset(s) in library ${library.id}...`);
await this.jobRepository.queueAll(
assets.map((asset) => ({
name: JobName.LIBRARY_SYNC_ASSET,
data: { id: asset.id, importPaths: library.importPaths, exclusionPatterns: library.exclusionPatterns },
})),
);
this.logger.debug(`Queued check of ${assets.length} asset(s) in library ${library.id}...`);
if (!assetCount) {
this.logger.log(`Library ${library.id} is empty, no need to check assets`);
return JobStatus.SUCCESS;
}
if (assetCount) {
this.logger.log(`Finished queueing check of ${assetCount} assets for library ${library.id}`);
this.logger.log(
`Checking ${assetCount} asset(s) against import paths and exclusion patterns in library ${library.id}...`,
);
const offlineResult = await this.assetRepository.detectOfflineExternalAssets(
library.id,
library.importPaths,
library.exclusionPatterns,
);
const affectedAssetCount = Number(offlineResult.numUpdatedRows);
this.logger.log(
`${affectedAssetCount} asset(s) out of ${assetCount} were offlined due to import paths and/or exclusion pattern(s) in library ${library.id}`,
);
if (affectedAssetCount === assetCount) {
return JobStatus.SUCCESS;
}
this.logger.log(`Scanning library ${library.id} for assets missing from disk...`);
const existingAssets = usePagination(JOBS_LIBRARY_PAGINATION_SIZE, (pagination) =>
this.assetRepository.getAllInLibrary(pagination, job.id),
);
let currentAssetCount = 0;
for await (const assets of existingAssets) {
if (assets.length === 0) {
throw new BadRequestException(`Failed to get assets for library ${job.id}`);
}
currentAssetCount += assets.length;
await this.jobRepository.queue({
name: JobName.LIBRARY_SYNC_ASSETS,
data: {
libraryId: library.id,
importPaths: library.importPaths,
exclusionPatterns: library.exclusionPatterns,
assetIds: assets.map(({ id }) => id),
progressCounter: currentAssetCount,
totalAssets: assetCount,
},
});
const completePercentage = ((100 * currentAssetCount) / assetCount).toFixed(1);
this.logger.log(
`Queued check of ${currentAssetCount} of ${assetCount} (${completePercentage} %) existing asset(s) so far in library ${library.id}`,
);
}
if (currentAssetCount) {
this.logger.log(`Finished queuing ${currentAssetCount} asset check(s) for library ${library.id}`);
}
return JobStatus.SUCCESS;