Merge remote-tracking branch 'origin/lighter_buckets_web' into lighter_buckets_server

This commit is contained in:
Min Idzelis
2025-05-02 23:24:34 +00:00
143 changed files with 2532 additions and 4564 deletions
@@ -135,6 +135,36 @@ export class AssetJobRepository {
.execute();
}
private assetsWithPreviews() {
return this.db
.selectFrom('assets')
.where('assets.isVisible', '=', true)
.where('assets.deletedAt', 'is', null)
.innerJoin('asset_job_status as job_status', 'assetId', 'assets.id')
.where('job_status.previewAt', 'is not', null);
}
@GenerateSql({ params: [], stream: true })
streamForSearchDuplicates(force?: boolean) {
return this.assetsWithPreviews()
.where((eb) => eb.not((eb) => eb.exists(eb.selectFrom('smart_search').whereRef('assetId', '=', 'assets.id'))))
.$if(!force, (qb) => qb.where('job_status.duplicatesDetectedAt', 'is', null))
.select(['assets.id'])
.stream();
}
@GenerateSql({ params: [], stream: true })
streamForEncodeClip(force?: boolean) {
return this.assetsWithPreviews()
.select(['assets.id'])
.$if(!force, (qb) =>
qb.where((eb) =>
eb.not((eb) => eb.exists(eb.selectFrom('smart_search').whereRef('assetId', '=', 'assets.id'))),
),
)
.stream();
}
@GenerateSql({ params: [DummyValue.UUID] })
getForClipEncoding(id: string) {
return this.db
@@ -292,4 +322,30 @@ export class AssetJobRepository {
.where('assets.deletedAt', '<=', trashedBefore)
.stream();
}
@GenerateSql({ params: [], stream: true })
streamForSidecar(force?: boolean) {
return this.db
.selectFrom('assets')
.select(['assets.id'])
.$if(!force, (qb) =>
qb.where((eb) => eb.or([eb('assets.sidecarPath', '=', ''), eb('assets.sidecarPath', 'is', null)])),
)
.where('assets.isVisible', '=', true)
.stream();
}
@GenerateSql({ params: [], stream: true })
streamForDetectFacesJob(force?: boolean) {
return this.assetsWithPreviews()
.$if(!force, (qb) => qb.where('job_status.facesRecognizedAt', 'is', null))
.select(['assets.id'])
.orderBy('assets.createdAt', 'desc')
.stream();
}
@GenerateSql({ params: [DummyValue.DATE], stream: true })
streamForMigrationJob() {
return this.db.selectFrom('assets').select(['id']).where('assets.deletedAt', 'is', null).stream();
}
}
+4 -103
View File
@@ -7,14 +7,12 @@ import { AssetFiles, AssetJobStatus, Assets, DB, Exif } from 'src/db';
import { Chunked, ChunkedArray, DummyValue, GenerateSql } from 'src/decorators';
import { MapAsset } from 'src/dtos/asset-response.dto';
import { AssetFileType, AssetOrder, AssetStatus, AssetType } from 'src/enum';
import { AssetSearchOptions, SearchExploreItem, SearchExploreItemSet } from 'src/repositories/search.repository';
import {
anyUuid,
asUuid,
hasPeople,
hasPeopleNoJoin,
removeUndefinedKeys,
searchAssetBuilder,
truncatedDate,
unnest,
withExif,
@@ -29,7 +27,7 @@ import {
withTags,
} from 'src/utils/database';
import { globToSqlPattern } from 'src/utils/misc';
import { PaginationOptions, paginationHelper } from 'src/utils/pagination';
import { PaginationOptions } from 'src/utils/pagination';
export type AssetStats = Record<AssetType, number>;
@@ -47,16 +45,6 @@ export interface LivePhotoSearchOptions {
type: AssetType;
}
export enum WithoutProperty {
THUMBNAIL = 'thumbnail',
ENCODED_VIDEO = 'encoded-video',
EXIF = 'exif',
SMART_SEARCH = 'smart-search',
DUPLICATE = 'duplicate',
FACES = 'faces',
SIDECAR = 'sidecar',
}
export enum WithProperty {
SIDECAR = 'sidecar',
}
@@ -337,10 +325,6 @@ export class AssetRepository {
return assets.map((asset) => asset.deviceAssetId);
}
getByUserId(pagination: PaginationOptions, userId: string, options: Omit<AssetSearchOptions, 'userIds'> = {}) {
return this.getAll(pagination, { ...options, userIds: [userId] });
}
@GenerateSql({ params: [DummyValue.UUID, DummyValue.STRING] })
getByLibraryIdAndOriginalPath(libraryId: string, originalPath: string) {
return this.db
@@ -352,16 +336,6 @@ export class AssetRepository {
.executeTakeFirst();
}
async getAll(pagination: PaginationOptions, { orderDirection, ...options }: AssetSearchOptions = {}) {
const builder = searchAssetBuilder(this.db, options)
.select(withFiles)
.orderBy('assets.createdAt', orderDirection ?? 'asc')
.limit(pagination.take + 1)
.offset(pagination.skip ?? 0);
const items = await builder.execute();
return paginationHelper(items, pagination.take);
}
/**
* Get assets by device's Id on the database
* @param ownerId
@@ -531,77 +505,6 @@ export class AssetRepository {
.executeTakeFirst();
}
@GenerateSql(
...Object.values(WithProperty).map((property) => ({
name: property,
params: [DummyValue.PAGINATION, property],
})),
)
async getWithout(pagination: PaginationOptions, property: WithoutProperty) {
const items = await this.db
.selectFrom('assets')
.selectAll('assets')
.$if(property === WithoutProperty.DUPLICATE, (qb) =>
qb
.innerJoin('asset_job_status as job_status', 'assets.id', 'job_status.assetId')
.where('job_status.duplicatesDetectedAt', 'is', null)
.where('job_status.previewAt', 'is not', null)
.where((eb) => eb.exists(eb.selectFrom('smart_search').where('assetId', '=', eb.ref('assets.id'))))
.where('assets.isVisible', '=', true),
)
.$if(property === WithoutProperty.ENCODED_VIDEO, (qb) =>
qb
.where('assets.type', '=', AssetType.VIDEO)
.where((eb) => eb.or([eb('assets.encodedVideoPath', 'is', null), eb('assets.encodedVideoPath', '=', '')])),
)
.$if(property === WithoutProperty.EXIF, (qb) =>
qb
.leftJoin('asset_job_status as job_status', 'assets.id', 'job_status.assetId')
.where((eb) => eb.or([eb('job_status.metadataExtractedAt', 'is', null), eb('assetId', 'is', null)]))
.where('assets.isVisible', '=', true),
)
.$if(property === WithoutProperty.FACES, (qb) =>
qb
.innerJoin('asset_job_status as job_status', 'assetId', 'assets.id')
.where('job_status.previewAt', 'is not', null)
.where('job_status.facesRecognizedAt', 'is', null)
.where('assets.isVisible', '=', true),
)
.$if(property === WithoutProperty.SIDECAR, (qb) =>
qb
.where((eb) => eb.or([eb('assets.sidecarPath', '=', ''), eb('assets.sidecarPath', 'is', null)]))
.where('assets.isVisible', '=', true),
)
.$if(property === WithoutProperty.SMART_SEARCH, (qb) =>
qb
.innerJoin('asset_job_status as job_status', 'assetId', 'assets.id')
.where('job_status.previewAt', 'is not', null)
.where('assets.isVisible', '=', true)
.where((eb) =>
eb.not((eb) => eb.exists(eb.selectFrom('smart_search').whereRef('assetId', '=', 'assets.id'))),
),
)
.$if(property === WithoutProperty.THUMBNAIL, (qb) =>
qb
.innerJoin('asset_job_status as job_status', 'assetId', 'assets.id')
.where('assets.isVisible', '=', true)
.where((eb) =>
eb.or([
eb('job_status.previewAt', 'is', null),
eb('job_status.thumbnailAt', 'is', null),
eb('assets.thumbhash', 'is', null),
]),
),
)
.where('deletedAt', 'is', null)
.limit(pagination.take + 1)
.offset(pagination.skip ?? 0)
.orderBy('createdAt')
.execute();
return paginationHelper(items, pagination.take);
}
getStatistics(ownerId: string, { isArchived, isFavorite, isTrashed }: AssetStatsOptions): Promise<AssetStats> {
return this.db
.selectFrom('assets')
@@ -816,10 +719,7 @@ export class AssetRepository {
}
@GenerateSql({ params: [DummyValue.UUID, { minAssetsPerField: 5, maxFields: 12 }] })
async getAssetIdByCity(
ownerId: string,
{ minAssetsPerField, maxFields }: AssetExploreFieldOptions,
): Promise<SearchExploreItem<string>> {
async getAssetIdByCity(ownerId: string, { minAssetsPerField, maxFields }: AssetExploreFieldOptions) {
const items = await this.db
.with('cities', (qb) =>
qb
@@ -834,6 +734,7 @@ export class AssetRepository {
.innerJoin('cities', 'exif.city', 'cities.city')
.distinctOn('exif.city')
.select(['assetId as data', 'exif.city as value'])
.$narrowType<{ value: NotNull }>()
.where('ownerId', '=', asUuid(ownerId))
.where('isVisible', '=', true)
.where('isArchived', '=', false)
@@ -842,7 +743,7 @@ export class AssetRepository {
.limit(maxFields)
.execute();
return { fieldName: 'exifInfo.city', items: items as SearchExploreItemSet<string> };
return { fieldName: 'exifInfo.city', items };
}
@GenerateSql({
@@ -12,6 +12,7 @@ import { DatabaseExtension, DatabaseLock, VectorIndex } from 'src/enum';
import { ConfigRepository } from 'src/repositories/config.repository';
import { LoggingRepository } from 'src/repositories/logging.repository';
import { ExtensionVersion, VectorExtension, VectorUpdateResult } from 'src/types';
import { vectorIndexQuery } from 'src/utils/database';
import { isValidInteger } from 'src/validation';
import { DataSource } from 'typeorm';
@@ -119,12 +120,7 @@ export class DatabaseRepository {
await sql`ALTER TABLE ${sql.raw(table)} ALTER COLUMN embedding SET DATA TYPE vector(${sql.raw(String(dimSize))})`.execute(
tx,
);
await sql`SET vectors.pgvector_compatibility=on`.execute(tx);
await sql`
CREATE INDEX IF NOT EXISTS ${sql.raw(index)} ON ${sql.raw(table)}
USING hnsw (embedding vector_cosine_ops)
WITH (ef_construction = 300, m = 16)
`.execute(tx);
await sql.raw(vectorIndexQuery({ vectorExtension: this.vectorExtension, table, indexName: index })).execute(tx);
});
}
}
+3 -2
View File
@@ -19,7 +19,7 @@ import { ReleaseNotification, ServerVersionResponseDto } from 'src/dtos/server.d
import { ImmichWorker, MetadataKey, QueueName } from 'src/enum';
import { ConfigRepository } from 'src/repositories/config.repository';
import { LoggingRepository } from 'src/repositories/logging.repository';
import { JobItem } from 'src/types';
import { JobItem, JobSource } from 'src/types';
import { handlePromiseError } from 'src/utils/misc';
type EmitHandlers = Partial<{ [T in EmitEvent]: Array<EventItem<T>> }>;
@@ -48,7 +48,7 @@ type EventMap = {
'config.validate': [{ newConfig: SystemConfig; oldConfig: SystemConfig }];
// album events
'album.update': [{ id: string; recipientIds: string[] }];
'album.update': [{ id: string; recipientId: string }];
'album.invite': [{ id: string; userId: string }];
// asset events
@@ -58,6 +58,7 @@ type EventMap = {
'asset.show': [{ assetId: string; userId: string }];
'asset.trash': [{ assetId: string; userId: string }];
'asset.delete': [{ assetId: string; userId: string }];
'asset.metadataExtracted': [{ assetId: string; userId: string; source?: JobSource }];
// asset bulk events
'assets.trash': [{ assetIds: string[]; userId: string }];
+10 -14
View File
@@ -9,7 +9,7 @@ import { JobName, JobStatus, MetadataKey, QueueCleanType, QueueName } from 'src/
import { ConfigRepository } from 'src/repositories/config.repository';
import { EventRepository } from 'src/repositories/event.repository';
import { LoggingRepository } from 'src/repositories/logging.repository';
import { IEntityJob, JobCounts, JobItem, JobOf, QueueStatus } from 'src/types';
import { JobCounts, JobItem, JobOf, QueueStatus } from 'src/types';
import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc';
type JobMapItem = {
@@ -206,7 +206,10 @@ export class JobRepository {
private getJobOptions(item: JobItem): JobsOptions | null {
switch (item.name) {
case JobName.NOTIFY_ALBUM_UPDATE: {
return { jobId: item.data.id, delay: item.data?.delay };
return {
jobId: `${item.data.id}/${item.data.recipientId}`,
delay: item.data?.delay,
};
}
case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: {
return { jobId: item.data.id };
@@ -227,19 +230,12 @@ export class JobRepository {
return this.moduleRef.get<Queue>(getQueueToken(queue), { strict: false });
}
public async removeJob(jobId: string, name: JobName): Promise<IEntityJob | undefined> {
const existingJob = await this.getQueue(this.getQueueName(name)).getJob(jobId);
if (!existingJob) {
return;
}
try {
/** @deprecated */
// todo: remove this when asset notifications no longer need it.
public async removeJob(name: JobName, jobID: string): Promise<void> {
const existingJob = await this.getQueue(this.getQueueName(name)).getJob(jobID);
if (existingJob) {
await existingJob.remove();
} catch (error: any) {
if (error.message?.includes('Missing key for job')) {
return;
}
throw error;
}
return existingJob.data;
}
}
@@ -5,7 +5,7 @@ import { Telemetry } from 'src/decorators';
import { LogLevel } from 'src/enum';
import { ConfigRepository } from 'src/repositories/config.repository';
type LogDetails = any[];
type LogDetails = any;
type LogFunction = () => string;
const LOG_LEVELS = [LogLevel.VERBOSE, LogLevel.DEBUG, LogLevel.LOG, LogLevel.WARN, LogLevel.ERROR, LogLevel.FATAL];
+36 -9
View File
@@ -1,16 +1,19 @@
import { Injectable, InternalServerErrorException } from '@nestjs/common';
import type { UserInfoResponse } from 'openid-client' with { 'resolution-mode': 'import' };
import { OAuthTokenEndpointAuthMethod } from 'src/enum';
import { LoggingRepository } from 'src/repositories/logging.repository';
export type OAuthConfig = {
clientId: string;
clientSecret: string;
clientSecret?: string;
issuerUrl: string;
mobileOverrideEnabled: boolean;
mobileRedirectUri: string;
profileSigningAlgorithm: string;
scope: string;
signingAlgorithm: string;
tokenEndpointAuthMethod: OAuthTokenEndpointAuthMethod;
timeout: number;
};
export type OAuthProfile = UserInfoResponse;
@@ -76,12 +79,10 @@ export class OAuthRepository {
);
}
if (error.code === 'OAUTH_INVALID_RESPONSE') {
this.logger.warn(`Invalid response from authorization server. Cause: ${error.cause?.message}`);
throw error.cause;
}
this.logger.error(`OAuth login failed: ${error.message}`);
this.logger.error(error);
throw error;
throw new Error('OAuth login failed', { cause: error });
}
}
@@ -103,6 +104,8 @@ export class OAuthRepository {
clientSecret,
profileSigningAlgorithm,
signingAlgorithm,
tokenEndpointAuthMethod,
timeout,
}: OAuthConfig) {
try {
const { allowInsecureRequests, discovery } = await import('openid-client');
@@ -114,14 +117,38 @@ export class OAuthRepository {
response_types: ['code'],
userinfo_signed_response_alg: profileSigningAlgorithm === 'none' ? undefined : profileSigningAlgorithm,
id_token_signed_response_alg: signingAlgorithm,
timeout: 30_000,
},
undefined,
{ execute: [allowInsecureRequests] },
await this.getTokenAuthMethod(tokenEndpointAuthMethod, clientSecret),
{
execute: [allowInsecureRequests],
timeout,
},
);
} catch (error: any | AggregateError) {
this.logger.error(`Error in OAuth discovery: ${error}`, error?.stack, error?.errors);
throw new InternalServerErrorException(`Error in OAuth discovery: ${error}`, { cause: error });
}
}
private async getTokenAuthMethod(tokenEndpointAuthMethod: OAuthTokenEndpointAuthMethod, clientSecret?: string) {
const { None, ClientSecretPost, ClientSecretBasic } = await import('openid-client');
if (!clientSecret) {
return None();
}
switch (tokenEndpointAuthMethod) {
case OAuthTokenEndpointAuthMethod.CLIENT_SECRET_POST: {
return ClientSecretPost(clientSecret);
}
case OAuthTokenEndpointAuthMethod.CLIENT_SECRET_BASIC: {
return ClientSecretBasic(clientSecret);
}
default: {
return None();
}
}
}
}
+2 -6
View File
@@ -6,7 +6,7 @@ import { AssetFaces, DB, FaceSearch, Person } from 'src/db';
import { ChunkedArray, DummyValue, GenerateSql } from 'src/decorators';
import { AssetFileType, SourceType } from 'src/enum';
import { removeUndefinedKeys } from 'src/utils/database';
import { PaginationOptions } from 'src/utils/pagination';
import { paginationHelper, PaginationOptions } from 'src/utils/pagination';
export interface PersonSearchOptions {
minimumFaceCount: number;
@@ -200,11 +200,7 @@ export class PersonRepository {
.limit(pagination.take + 1)
.execute();
if (items.length > pagination.take) {
return { items: items.slice(0, -1), hasNextPage: true };
}
return { items, hasNextPage: false };
return paginationHelper(items, pagination.take);
}
@GenerateSql()
+31 -50
View File
@@ -6,42 +6,12 @@ import { DB, Exif } from 'src/db';
import { DummyValue, GenerateSql } from 'src/decorators';
import { MapAsset } from 'src/dtos/asset-response.dto';
import { AssetStatus, AssetType } from 'src/enum';
import { anyUuid, asUuid, searchAssetBuilder } from 'src/utils/database';
import { ConfigRepository } from 'src/repositories/config.repository';
import { anyUuid, asUuid, searchAssetBuilder, vectorIndexQuery } from 'src/utils/database';
import { paginationHelper } from 'src/utils/pagination';
import { isValidInteger } from 'src/validation';
export interface SearchResult<T> {
/** total matches */
total: number;
/** collection size */
count: number;
/** current page */
page: number;
/** items for page */
items: T[];
/** score */
distances: number[];
facets: SearchFacet[];
}
export interface SearchFacet {
fieldName: string;
counts: Array<{
count: number;
value: string;
}>;
}
export type SearchExploreItemSet<T> = Array<{
value: string;
data: T;
}>;
export interface SearchExploreItem<T> {
fieldName: string;
items: SearchExploreItemSet<T>;
}
export interface SearchAssetIDOptions {
export interface SearchAssetIdOptions {
checksum?: Buffer;
deviceAssetId?: string;
id?: string;
@@ -53,7 +23,7 @@ export interface SearchUserIdOptions {
userIds?: string[];
}
export type SearchIdOptions = SearchAssetIDOptions & SearchUserIdOptions;
export type SearchIdOptions = SearchAssetIdOptions & SearchUserIdOptions;
export interface SearchStatusOptions {
isArchived?: boolean;
@@ -143,8 +113,6 @@ type BaseAssetSearchOptions = SearchDateOptions &
export type AssetSearchOptions = BaseAssetSearchOptions & SearchRelationOptions;
export type AssetSearchOneToOneRelationOptions = BaseAssetSearchOptions & SearchOneToOneRelationOptions;
export type AssetSearchBuilderOptions = Omit<AssetSearchOptions, 'orderDirection'>;
export type SmartSearchOptions = SearchDateOptions &
@@ -201,7 +169,10 @@ export interface GetCameraMakesOptions {
@Injectable()
export class SearchRepository {
constructor(@InjectKysely() private db: Kysely<DB>) {}
constructor(
@InjectKysely() private db: Kysely<DB>,
private configRepository: ConfigRepository,
) {}
@GenerateSql({
params: [
@@ -222,9 +193,8 @@ export class SearchRepository {
.limit(pagination.size + 1)
.offset((pagination.page - 1) * pagination.size)
.execute();
const hasNextPage = items.length > pagination.size;
items.splice(pagination.size);
return { items, hasNextPage };
return paginationHelper(items, pagination.size);
}
@GenerateSql({
@@ -279,9 +249,7 @@ export class SearchRepository {
.offset((pagination.page - 1) * pagination.size)
.execute();
const hasNextPage = items.length > pagination.size;
items.splice(pagination.size);
return { items, hasNextPage };
return paginationHelper(items, pagination.size);
}
@GenerateSql({
@@ -446,8 +414,8 @@ export class SearchRepository {
async upsert(assetId: string, embedding: string): Promise<void> {
await this.db
.insertInto('smart_search')
.values({ assetId: asUuid(assetId), embedding } as any)
.onConflict((oc) => oc.column('assetId').doUpdateSet({ embedding } as any))
.values({ assetId, embedding })
.onConflict((oc) => oc.column('assetId').doUpdateSet((eb) => ({ embedding: eb.ref('excluded.embedding') })))
.execute();
}
@@ -469,19 +437,32 @@ export class SearchRepository {
return dimSize;
}
setDimensionSize(dimSize: number): Promise<void> {
async setDimensionSize(dimSize: number): Promise<void> {
if (!isValidInteger(dimSize, { min: 1, max: 2 ** 16 })) {
throw new Error(`Invalid CLIP dimension size: ${dimSize}`);
}
return this.db.transaction().execute(async (trx) => {
await sql`truncate ${sql.table('smart_search')}`.execute(trx);
// this is done in two transactions to handle concurrent writes
await this.db.transaction().execute(async (trx) => {
await sql`delete from ${sql.table('smart_search')}`.execute(trx);
await trx.schema.alterTable('smart_search').dropConstraint('dim_size_constraint').ifExists().execute();
await sql`alter table ${sql.table('smart_search')} add constraint dim_size_constraint check (array_length(embedding::real[], 1) = ${sql.lit(dimSize)})`.execute(
trx,
);
});
const vectorExtension = this.configRepository.getEnv().database.vectorExtension;
await this.db.transaction().execute(async (trx) => {
await sql`drop index if exists clip_index`.execute(trx);
await trx.schema
.alterTable('smart_search')
.alterColumn('embedding', (col) => col.setDataType(sql.raw(`vector(${dimSize})`)))
.execute();
await sql`reindex index clip_index`.execute(trx);
await sql.raw(vectorIndexQuery({ vectorExtension, table: 'smart_search', indexName: 'clip_index' })).execute(trx);
await trx.schema.alterTable('smart_search').dropConstraint('dim_size_constraint').ifExists().execute();
});
await sql`vacuum analyze ${sql.table('smart_search')}`.execute(this.db);
}
async deleteAllSearchEmbeddings(): Promise<void> {