feat: use pgvecto.rs (#3605)
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
import {
|
||||
AssetBuilderOptions,
|
||||
AssetCreate,
|
||||
AssetExploreFieldOptions,
|
||||
AssetSearchOptions,
|
||||
AssetStats,
|
||||
AssetStatsOptions,
|
||||
@@ -7,24 +9,25 @@ import {
|
||||
LivePhotoSearchOptions,
|
||||
MapMarker,
|
||||
MapMarkerSearchOptions,
|
||||
MetadataSearchOptions,
|
||||
MonthDay,
|
||||
Paginated,
|
||||
PaginationOptions,
|
||||
SearchExploreItem,
|
||||
TimeBucketItem,
|
||||
TimeBucketOptions,
|
||||
TimeBucketSize,
|
||||
WithoutProperty,
|
||||
WithProperty,
|
||||
WithoutProperty,
|
||||
} from '@app/domain';
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import _ from 'lodash';
|
||||
import { DateTime } from 'luxon';
|
||||
import { And, FindOptionsRelations, FindOptionsWhere, In, IsNull, LessThan, Not, Repository } from 'typeorm';
|
||||
import { AssetEntity, AssetJobStatusEntity, AssetType, ExifEntity } from '../entities';
|
||||
import { AssetEntity, AssetJobStatusEntity, AssetType, ExifEntity, SmartInfoEntity } from '../entities';
|
||||
import { DummyValue, GenerateSql } from '../infra.util';
|
||||
import OptionalBetween from '../utils/optional-between.util';
|
||||
import { paginate } from '../utils/pagination.util';
|
||||
import { OptionalBetween, paginate } from '../infra.utils';
|
||||
|
||||
const DEFAULT_SEARCH_SIZE = 250;
|
||||
|
||||
@@ -44,6 +47,7 @@ export class AssetRepository implements IAssetRepository {
|
||||
@InjectRepository(AssetEntity) private repository: Repository<AssetEntity>,
|
||||
@InjectRepository(ExifEntity) private exifRepository: Repository<ExifEntity>,
|
||||
@InjectRepository(AssetJobStatusEntity) private jobStatusRepository: Repository<AssetJobStatusEntity>,
|
||||
@InjectRepository(SmartInfoEntity) private smartInfoRepository: Repository<SmartInfoEntity>,
|
||||
) {}
|
||||
|
||||
async upsertExif(exif: Partial<ExifEntity>): Promise<void> {
|
||||
@@ -356,16 +360,20 @@ export class AssetRepository implements IAssetRepository {
|
||||
}
|
||||
|
||||
@GenerateSql({ params: [DummyValue.UUID] })
|
||||
getById(id: string): Promise<AssetEntity | null> {
|
||||
return this.repository.findOne({
|
||||
where: { id },
|
||||
relations: {
|
||||
getById(id: string, relations: FindOptionsRelations<AssetEntity>): Promise<AssetEntity | null> {
|
||||
if (!relations) {
|
||||
relations = {
|
||||
faces: {
|
||||
person: true,
|
||||
},
|
||||
library: true,
|
||||
stack: true,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
return this.repository.findOne({
|
||||
where: { id },
|
||||
relations,
|
||||
// We are specifically asking for this asset. Return it even if it is soft deleted
|
||||
withDeleted: true,
|
||||
});
|
||||
@@ -472,13 +480,13 @@ export class AssetRepository implements IAssetRepository {
|
||||
|
||||
case WithoutProperty.CLIP_ENCODING:
|
||||
relations = {
|
||||
smartInfo: true,
|
||||
smartSearch: true,
|
||||
};
|
||||
where = {
|
||||
isVisible: true,
|
||||
resizePath: Not(IsNull()),
|
||||
smartInfo: {
|
||||
clipEmbedding: IsNull(),
|
||||
smartSearch: {
|
||||
embedding: IsNull(),
|
||||
},
|
||||
};
|
||||
break;
|
||||
@@ -689,15 +697,82 @@ export class AssetRepository implements IAssetRepository {
|
||||
);
|
||||
}
|
||||
|
||||
private getBuilder(options: TimeBucketOptions) {
|
||||
const { isArchived, isFavorite, isTrashed, albumId, personId, userIds, withStacked } = options;
|
||||
@GenerateSql({ params: [DummyValue.UUID, { minAssetsPerField: 5, maxFields: 12 }] })
|
||||
async getAssetIdByCity(
|
||||
ownerId: string,
|
||||
{ minAssetsPerField, maxFields }: AssetExploreFieldOptions,
|
||||
): Promise<SearchExploreItem<string>> {
|
||||
const cte = this.exifRepository
|
||||
.createQueryBuilder('e')
|
||||
.select('city')
|
||||
.groupBy('city')
|
||||
.having('count(city) >= :minAssetsPerField', { minAssetsPerField })
|
||||
.orderBy('random()')
|
||||
.limit(maxFields);
|
||||
|
||||
const items = await this.getBuilder({
|
||||
userIds: [ownerId],
|
||||
exifInfo: false,
|
||||
assetType: AssetType.IMAGE,
|
||||
isArchived: false,
|
||||
})
|
||||
.select('c.city', 'value')
|
||||
.addSelect('asset.id', 'data')
|
||||
.distinctOn(['c.city'])
|
||||
.innerJoin('exif', 'e', 'asset.id = e."assetId"')
|
||||
.addCommonTableExpression(cte, 'cities')
|
||||
.innerJoin('cities', 'c', 'c.city = e.city')
|
||||
.limit(maxFields)
|
||||
.getRawMany();
|
||||
|
||||
return { fieldName: 'exifInfo.city', items };
|
||||
}
|
||||
|
||||
@GenerateSql({ params: [DummyValue.UUID, { minAssetsPerField: 5, maxFields: 12 }] })
|
||||
async getAssetIdByTag(
|
||||
ownerId: string,
|
||||
{ minAssetsPerField, maxFields }: AssetExploreFieldOptions,
|
||||
): Promise<SearchExploreItem<string>> {
|
||||
const cte = this.smartInfoRepository
|
||||
.createQueryBuilder('si')
|
||||
.select('unnest(tags)', 'tag')
|
||||
.groupBy('tag')
|
||||
.having('count(*) >= :minAssetsPerField', { minAssetsPerField })
|
||||
.orderBy('random()')
|
||||
.limit(maxFields);
|
||||
|
||||
const items = await this.getBuilder({
|
||||
userIds: [ownerId],
|
||||
exifInfo: false,
|
||||
assetType: AssetType.IMAGE,
|
||||
isArchived: false,
|
||||
})
|
||||
.select('unnest(si.tags)', 'value')
|
||||
.addSelect('asset.id', 'data')
|
||||
.distinctOn(['unnest(si.tags)'])
|
||||
.innerJoin('smart_info', 'si', 'asset.id = si."assetId"')
|
||||
.addCommonTableExpression(cte, 'random_tags')
|
||||
.innerJoin('random_tags', 't', 'si.tags @> ARRAY[t.tag]')
|
||||
.limit(maxFields)
|
||||
.getRawMany();
|
||||
|
||||
return { fieldName: 'smartInfo.tags', items };
|
||||
}
|
||||
|
||||
private getBuilder(options: AssetBuilderOptions) {
|
||||
const { isArchived, isFavorite, isTrashed, albumId, personId, userIds, withStacked, exifInfo, assetType } = options;
|
||||
|
||||
let builder = this.repository
|
||||
.createQueryBuilder('asset')
|
||||
.where('asset.isVisible = true')
|
||||
.andWhere('asset.fileCreatedAt < NOW()')
|
||||
.leftJoinAndSelect('asset.exifInfo', 'exifInfo')
|
||||
.leftJoinAndSelect('asset.stack', 'stack');
|
||||
.andWhere('asset.fileCreatedAt < NOW()');
|
||||
if (assetType !== undefined) {
|
||||
builder = builder.andWhere('asset.type = :assetType', { assetType });
|
||||
}
|
||||
|
||||
if (exifInfo !== false) {
|
||||
builder = builder.leftJoinAndSelect('asset.exifInfo', 'exifInfo').leftJoinAndSelect('asset.stack', 'stack');
|
||||
}
|
||||
|
||||
if (albumId) {
|
||||
builder = builder.leftJoin('asset.albums', 'album').andWhere('album.id = :albumId', { albumId });
|
||||
@@ -732,4 +807,46 @@ export class AssetRepository implements IAssetRepository {
|
||||
|
||||
return builder;
|
||||
}
|
||||
|
||||
async searchMetadata(query: string, ownerId: string, { numResults }: MetadataSearchOptions): Promise<AssetEntity[]> {
|
||||
const rows = await this.repository
|
||||
.createQueryBuilder('assets')
|
||||
.select('assets.*')
|
||||
.addSelect('e.country', 'country')
|
||||
.addSelect('e.state', 'state')
|
||||
.addSelect('e.city', 'city')
|
||||
.addSelect('e.description', 'description')
|
||||
.addSelect('e.model', 'model')
|
||||
.addSelect('e.make', 'make')
|
||||
.addSelect('COALESCE(si.tags, array[]::text[])', 'tags')
|
||||
.addSelect('COALESCE(si.objects, array[]::text[])', 'objects')
|
||||
.innerJoin('smart_info', 'si', 'si."assetId" = assets."id"')
|
||||
.innerJoin('exif', 'e', 'assets."id" = e."assetId"')
|
||||
.where('a.ownerId = :ownerId', { ownerId })
|
||||
.where(
|
||||
'(e."exifTextSearchableColumn" || si."smartInfoTextSearchableColumn") @@ PLAINTO_TSQUERY(\'english\', :query)',
|
||||
{ query },
|
||||
)
|
||||
.limit(numResults)
|
||||
.getRawMany();
|
||||
|
||||
return rows.map(
|
||||
({ tags, objects, country, state, city, description, model, make, ...assetInfo }) =>
|
||||
({
|
||||
exifInfo: {
|
||||
country,
|
||||
state,
|
||||
city,
|
||||
description,
|
||||
model,
|
||||
make,
|
||||
},
|
||||
smartInfo: {
|
||||
tags,
|
||||
objects,
|
||||
},
|
||||
...assetInfo,
|
||||
}) as AssetEntity,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,5 @@ export * from './smart-info.repository';
|
||||
export * from './system-config.repository';
|
||||
export * from './system-metadata.repository';
|
||||
export * from './tag.repository';
|
||||
export * from './typesense.repository';
|
||||
export * from './user-token.repository';
|
||||
export * from './user.repository';
|
||||
|
||||
@@ -5,8 +5,8 @@ import {
|
||||
ISystemMetadataRepository,
|
||||
ReverseGeocodeResult,
|
||||
} from '@app/domain';
|
||||
import { DatabaseLock, RequireLock } from '@app/infra';
|
||||
import { GeodataAdmin1Entity, GeodataAdmin2Entity, GeodataPlacesEntity, SystemMetadataKey } from '@app/infra/entities';
|
||||
import { DatabaseLock } from '@app/infra/utils/database-locks';
|
||||
import { Inject, Logger } from '@nestjs/common';
|
||||
import { InjectDataSource, InjectRepository } from '@nestjs/typeorm';
|
||||
import { DefaultReadTaskOptions, exiftool, Tags } from 'exiftool-vendored';
|
||||
@@ -33,16 +33,14 @@ export class MetadataRepository implements IMetadataRepository {
|
||||
|
||||
private logger = new Logger(MetadataRepository.name);
|
||||
|
||||
@RequireLock(DatabaseLock.GeodataImport)
|
||||
async init(): Promise<void> {
|
||||
this.logger.log('Initializing metadata repository');
|
||||
const geodataDate = await readFile('/usr/src/resources/geodata-date.txt', 'utf8');
|
||||
|
||||
await this.geodataPlacesRepository.query('SELECT pg_advisory_lock($1)', [DatabaseLock.GeodataImport]);
|
||||
|
||||
const geocodingMetadata = await this.systemMetadataRepository.get(SystemMetadataKey.REVERSE_GEOCODING_STATE);
|
||||
|
||||
if (geocodingMetadata?.lastUpdate === geodataDate) {
|
||||
await this.dataSource.query('SELECT pg_advisory_unlock($1)', [DatabaseLock.GeodataImport]);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -72,7 +70,6 @@ export class MetadataRepository implements IMetadataRepository {
|
||||
lastImportFileName: CITIES_FILE,
|
||||
});
|
||||
|
||||
await this.dataSource.query('SELECT pg_advisory_unlock($1)', [DatabaseLock.GeodataImport]);
|
||||
this.logger.log('Geodata import completed');
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { In, Repository } from 'typeorm';
|
||||
import { AssetEntity, AssetFaceEntity, PersonEntity } from '../entities';
|
||||
import { DummyValue, GenerateSql } from '../infra.util';
|
||||
import { asVector } from '../infra.utils';
|
||||
|
||||
export class PersonRepository implements IPersonRepository {
|
||||
constructor(
|
||||
@@ -215,8 +216,15 @@ export class PersonRepository implements IPersonRepository {
|
||||
return this.personRepository.save(entity);
|
||||
}
|
||||
|
||||
createFace(entity: Partial<AssetFaceEntity>): Promise<AssetFaceEntity> {
|
||||
return this.assetFaceRepository.save(entity);
|
||||
async createFace(entity: AssetFaceEntity): Promise<AssetFaceEntity> {
|
||||
if (!entity.personId) {
|
||||
throw new Error('Person ID is required to create a face');
|
||||
}
|
||||
if (!entity.embedding) {
|
||||
throw new Error('Embedding is required to create a face');
|
||||
}
|
||||
await this.assetFaceRepository.insert({ ...entity, embedding: () => asVector(entity.embedding, true) });
|
||||
return this.assetFaceRepository.findOneByOrFail({ assetId: entity.assetId, personId: entity.personId });
|
||||
}
|
||||
|
||||
async update(entity: Partial<PersonEntity>): Promise<PersonEntity> {
|
||||
|
||||
@@ -1,14 +1,171 @@
|
||||
import { ISmartInfoRepository } from '@app/domain';
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Embedding, EmbeddingSearch, ISmartInfoRepository } from '@app/domain';
|
||||
import { getCLIPModelInfo } from '@app/domain/smart-info/smart-info.constant';
|
||||
import { DatabaseLock, RequireLock, asyncLock } from '@app/infra';
|
||||
import { AssetEntity, AssetFaceEntity, SmartInfoEntity, SmartSearchEntity } from '@app/infra/entities';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { Repository } from 'typeorm';
|
||||
import { SmartInfoEntity } from '../entities';
|
||||
import { DummyValue, GenerateSql } from '../infra.util';
|
||||
import { asVector, isValidInteger } from '../infra.utils';
|
||||
|
||||
@Injectable()
|
||||
export class SmartInfoRepository implements ISmartInfoRepository {
|
||||
constructor(@InjectRepository(SmartInfoEntity) private repository: Repository<SmartInfoEntity>) {}
|
||||
private logger = new Logger(SmartInfoRepository.name);
|
||||
|
||||
async upsert(info: Partial<SmartInfoEntity>): Promise<void> {
|
||||
await this.repository.upsert(info, { conflictPaths: ['assetId'] });
|
||||
constructor(
|
||||
@InjectRepository(SmartInfoEntity) private repository: Repository<SmartInfoEntity>,
|
||||
@InjectRepository(AssetEntity) private assetRepository: Repository<AssetEntity>,
|
||||
@InjectRepository(AssetFaceEntity) private assetFaceRepository: Repository<AssetFaceEntity>,
|
||||
@InjectRepository(SmartSearchEntity) private smartSearchRepository: Repository<SmartSearchEntity>,
|
||||
) {}
|
||||
|
||||
async init(modelName: string): Promise<void> {
|
||||
const { dimSize } = getCLIPModelInfo(modelName);
|
||||
if (dimSize == null) {
|
||||
throw new Error(`Invalid CLIP model name: ${modelName}`);
|
||||
}
|
||||
|
||||
const curDimSize = await this.getDimSize();
|
||||
this.logger.verbose(`Current database CLIP dimension size is ${curDimSize}`);
|
||||
|
||||
if (dimSize != curDimSize) {
|
||||
this.logger.log(`Dimension size of model ${modelName} is ${dimSize}, but database expects ${curDimSize}.`);
|
||||
await this.updateDimSize(dimSize);
|
||||
}
|
||||
}
|
||||
|
||||
@GenerateSql({
|
||||
params: [{ ownerId: DummyValue.UUID, embedding: Array.from({ length: 512 }, Math.random), numResults: 100 }],
|
||||
})
|
||||
async searchCLIP({ ownerId, embedding, numResults }: EmbeddingSearch): Promise<AssetEntity[]> {
|
||||
if (!isValidInteger(numResults, { min: 1 })) {
|
||||
throw new Error(`Invalid value for 'numResults': ${numResults}`);
|
||||
}
|
||||
|
||||
let results: AssetEntity[] = [];
|
||||
await this.assetRepository.manager.transaction(async (manager) => {
|
||||
await manager.query(`SET LOCAL vectors.k = '${numResults}'`);
|
||||
results = await manager
|
||||
.createQueryBuilder(AssetEntity, 'a')
|
||||
.innerJoin('a.smartSearch', 's')
|
||||
.where('a.ownerId = :ownerId')
|
||||
.leftJoinAndSelect('a.exifInfo', 'e')
|
||||
.orderBy('s.embedding <=> :embedding')
|
||||
.setParameters({ ownerId, embedding: asVector(embedding) })
|
||||
.limit(numResults)
|
||||
.getMany();
|
||||
});
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
@GenerateSql({
|
||||
params: [
|
||||
{
|
||||
ownerId: DummyValue.UUID,
|
||||
embedding: Array.from({ length: 512 }, Math.random),
|
||||
numResults: 100,
|
||||
maxDistance: 0.6,
|
||||
},
|
||||
],
|
||||
})
|
||||
async searchFaces({ ownerId, embedding, numResults, maxDistance }: EmbeddingSearch): Promise<AssetFaceEntity[]> {
|
||||
if (!isValidInteger(numResults, { min: 1 })) {
|
||||
throw new Error(`Invalid value for 'numResults': ${numResults}`);
|
||||
}
|
||||
|
||||
let results: AssetFaceEntity[] = [];
|
||||
await this.assetRepository.manager.transaction(async (manager) => {
|
||||
await manager.query(`SET LOCAL vectors.k = '${numResults}'`);
|
||||
const cte = manager
|
||||
.createQueryBuilder(AssetFaceEntity, 'faces')
|
||||
.addSelect('1 + (faces.embedding <=> :embedding)', 'distance')
|
||||
.innerJoin('faces.asset', 'asset')
|
||||
.where('asset.ownerId = :ownerId')
|
||||
.orderBy(`faces.embedding <=> :embedding`)
|
||||
.setParameters({ ownerId, embedding: asVector(embedding) })
|
||||
.limit(numResults);
|
||||
|
||||
results = await manager
|
||||
.createQueryBuilder()
|
||||
.select('res.*')
|
||||
.addCommonTableExpression(cte, 'cte')
|
||||
.from('cte', 'res')
|
||||
.where('res.distance <= :maxDistance', { maxDistance })
|
||||
.getRawMany();
|
||||
});
|
||||
|
||||
return this.assetFaceRepository.create(results);
|
||||
}
|
||||
|
||||
async upsert(smartInfo: Partial<SmartInfoEntity>, embedding?: Embedding): Promise<void> {
|
||||
await this.repository.upsert(smartInfo, { conflictPaths: ['assetId'] });
|
||||
if (!smartInfo.assetId || !embedding) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.upsertEmbedding(smartInfo.assetId, embedding);
|
||||
}
|
||||
|
||||
private async upsertEmbedding(assetId: string, embedding: number[]): Promise<void> {
|
||||
if (asyncLock.isBusy(DatabaseLock[DatabaseLock.CLIPDimSize])) {
|
||||
this.logger.verbose(`Waiting for CLIP dimension size to be updated`);
|
||||
await asyncLock.acquire(DatabaseLock[DatabaseLock.CLIPDimSize], () => {});
|
||||
}
|
||||
|
||||
await this.smartSearchRepository.upsert(
|
||||
{ assetId, embedding: () => asVector(embedding, true) },
|
||||
{ conflictPaths: ['assetId'] },
|
||||
);
|
||||
}
|
||||
|
||||
@RequireLock(DatabaseLock.CLIPDimSize)
|
||||
private async updateDimSize(dimSize: number): Promise<void> {
|
||||
if (!isValidInteger(dimSize, { min: 1, max: 2 ** 16 })) {
|
||||
throw new Error(`Invalid CLIP dimension size: ${dimSize}`);
|
||||
}
|
||||
|
||||
const curDimSize = await this.getDimSize();
|
||||
if (curDimSize === dimSize) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.log(`Updating database CLIP dimension size to ${dimSize}.`);
|
||||
|
||||
await this.smartSearchRepository.manager.transaction(async (manager) => {
|
||||
await manager.query(`DROP TABLE smart_search`);
|
||||
|
||||
await manager.query(`
|
||||
CREATE TABLE smart_search (
|
||||
"assetId" uuid PRIMARY KEY REFERENCES assets(id) ON DELETE CASCADE,
|
||||
embedding vector(${dimSize}) NOT NULL )`);
|
||||
|
||||
await manager.query(`
|
||||
CREATE INDEX clip_index ON smart_search
|
||||
USING vectors (embedding cosine_ops) WITH (options = $$
|
||||
[indexing.hnsw]
|
||||
m = 16
|
||||
ef_construction = 300
|
||||
$$)`);
|
||||
});
|
||||
|
||||
this.logger.log(`Successfully updated database CLIP dimension size from ${curDimSize} to ${dimSize}.`);
|
||||
}
|
||||
|
||||
private async getDimSize(): Promise<number> {
|
||||
const res = await this.smartSearchRepository.manager.query(`
|
||||
SELECT atttypmod as dimsize
|
||||
FROM pg_attribute f
|
||||
JOIN pg_class c ON c.oid = f.attrelid
|
||||
WHERE c.relkind = 'r'::char
|
||||
AND f.attnum > 0
|
||||
AND c.relname = 'smart_search'
|
||||
AND f.attname = 'embedding'`);
|
||||
|
||||
const dimSize = res[0]['dimsize'];
|
||||
if (!isValidInteger(dimSize, { min: 1, max: 2 ** 16 })) {
|
||||
throw new Error(`Could not retrieve CLIP dimension size`);
|
||||
}
|
||||
return dimSize;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,503 +0,0 @@
|
||||
import {
|
||||
ISearchRepository,
|
||||
OwnedFaceEntity,
|
||||
SearchCollection,
|
||||
SearchCollectionIndexStatus,
|
||||
SearchExploreItem,
|
||||
SearchFaceFilter,
|
||||
SearchFilter,
|
||||
SearchResult,
|
||||
} from '@app/domain';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import _, { Dictionary } from 'lodash';
|
||||
import { catchError, filter, firstValueFrom, from, map, mergeMap, of, toArray } from 'rxjs';
|
||||
import { Client } from 'typesense';
|
||||
import { CollectionCreateSchema } from 'typesense/lib/Typesense/Collections';
|
||||
import { DocumentSchema, SearchResponse } from 'typesense/lib/Typesense/Documents';
|
||||
import { AlbumEntity, AssetEntity, AssetFaceEntity } from '../entities';
|
||||
import { typesenseConfig } from '../infra.config';
|
||||
import { albumSchema, assetSchema, faceSchema } from '../typesense-schemas';
|
||||
|
||||
function removeNil<T extends Dictionary<any>>(item: T): T {
|
||||
_.forOwn(item, (value, key) => {
|
||||
if (_.isNil(value) || (_.isObject(value) && !_.isDate(value) && _.isEmpty(removeNil(value)))) {
|
||||
delete item[key];
|
||||
}
|
||||
});
|
||||
|
||||
return item;
|
||||
}
|
||||
|
||||
interface MultiSearchError {
|
||||
code: number;
|
||||
error: string;
|
||||
}
|
||||
|
||||
interface CustomAssetEntity extends AssetEntity {
|
||||
geo?: [number, number];
|
||||
motion?: boolean;
|
||||
people?: string[];
|
||||
}
|
||||
|
||||
const schemaMap: Record<SearchCollection, CollectionCreateSchema> = {
|
||||
[SearchCollection.ASSETS]: assetSchema,
|
||||
[SearchCollection.ALBUMS]: albumSchema,
|
||||
[SearchCollection.FACES]: faceSchema,
|
||||
};
|
||||
|
||||
const schemas = Object.entries(schemaMap) as [SearchCollection, CollectionCreateSchema][];
|
||||
|
||||
@Injectable()
|
||||
export class TypesenseRepository implements ISearchRepository {
|
||||
private logger = new Logger(TypesenseRepository.name);
|
||||
|
||||
private _client: Client | null = null;
|
||||
private _updateCLIPLock = false;
|
||||
|
||||
private get client(): Client {
|
||||
if (!this._client) {
|
||||
throw new Error('Typesense client not available (no apiKey was provided)');
|
||||
}
|
||||
return this._client;
|
||||
}
|
||||
|
||||
constructor() {
|
||||
if (!typesenseConfig.apiKey) {
|
||||
return;
|
||||
}
|
||||
|
||||
this._client = new Client(typesenseConfig);
|
||||
}
|
||||
|
||||
async setup(): Promise<void> {
|
||||
const collections = await this.client.collections().retrieve();
|
||||
for (const collection of collections) {
|
||||
this.logger.debug(`${collection.name} collection has ${collection.num_documents} documents`);
|
||||
// await this.client.collections(collection.name).delete();
|
||||
}
|
||||
|
||||
// upsert collections
|
||||
for (const [collectionName, schema] of schemas) {
|
||||
const collection = await this.client
|
||||
.collections(schema.name)
|
||||
.retrieve()
|
||||
.catch(() => null);
|
||||
if (!collection) {
|
||||
this.logger.log(`Creating schema: ${collectionName}/${schema.name}`);
|
||||
await this.client.collections().create(schema);
|
||||
} else {
|
||||
this.logger.log(`Schema up to date: ${collectionName}/${schema.name}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async checkMigrationStatus(): Promise<SearchCollectionIndexStatus> {
|
||||
const migrationMap: SearchCollectionIndexStatus = {
|
||||
[SearchCollection.ASSETS]: false,
|
||||
[SearchCollection.ALBUMS]: false,
|
||||
[SearchCollection.FACES]: false,
|
||||
};
|
||||
|
||||
// check if alias is using the current schema
|
||||
const { aliases } = await this.client.aliases().retrieve();
|
||||
this.logger.log(`Alias mapping: ${JSON.stringify(aliases)}`);
|
||||
|
||||
for (const [aliasName, schema] of schemas) {
|
||||
const match = aliases.find((alias) => alias.name === aliasName);
|
||||
if (!match || match.collection_name !== schema.name) {
|
||||
migrationMap[aliasName] = true;
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log(`Collections needing migration: ${JSON.stringify(migrationMap)}`);
|
||||
|
||||
return migrationMap;
|
||||
}
|
||||
|
||||
async importAlbums(items: AlbumEntity[], done: boolean): Promise<void> {
|
||||
await this.import(SearchCollection.ALBUMS, items, done);
|
||||
}
|
||||
|
||||
async importAssets(items: AssetEntity[], done: boolean): Promise<void> {
|
||||
await this.import(SearchCollection.ASSETS, items, done);
|
||||
}
|
||||
|
||||
async importFaces(items: OwnedFaceEntity[], done: boolean): Promise<void> {
|
||||
await this.import(SearchCollection.FACES, items, done);
|
||||
}
|
||||
|
||||
private async import(
|
||||
collection: SearchCollection,
|
||||
items: AlbumEntity[] | AssetEntity[] | OwnedFaceEntity[],
|
||||
done: boolean,
|
||||
): Promise<void> {
|
||||
try {
|
||||
if (items.length > 0) {
|
||||
await this.client.collections(schemaMap[collection].name).documents().import(this.patch(collection, items), {
|
||||
action: 'upsert',
|
||||
dirty_values: 'coerce_or_drop',
|
||||
});
|
||||
}
|
||||
|
||||
if (done) {
|
||||
await this.updateAlias(collection);
|
||||
}
|
||||
} catch (error: any) {
|
||||
await this.handleError(error);
|
||||
}
|
||||
}
|
||||
|
||||
async explore(userId: string): Promise<SearchExploreItem<AssetEntity>[]> {
|
||||
const common = {
|
||||
q: '*',
|
||||
filter_by: [this.buildFilterBy('ownerId', userId, true), this.buildFilterBy('isArchived', false)].join(' && '),
|
||||
per_page: 100,
|
||||
};
|
||||
|
||||
const asset$ = this.client.collections<AssetEntity>(assetSchema.name).documents();
|
||||
|
||||
const { facet_counts: facets } = await asset$.search({
|
||||
...common,
|
||||
query_by: 'originalFileName',
|
||||
facet_by: 'exifInfo.city,smartInfo.objects',
|
||||
max_facet_values: 12,
|
||||
});
|
||||
|
||||
return firstValueFrom(
|
||||
from(facets || []).pipe(
|
||||
mergeMap(
|
||||
(facet) =>
|
||||
from(facet.counts).pipe(
|
||||
mergeMap((count) => {
|
||||
const config = {
|
||||
...common,
|
||||
query_by: 'originalFileName',
|
||||
filter_by: [common.filter_by, this.buildFilterBy(facet.field_name, count.value, true)].join(' && '),
|
||||
per_page: 1,
|
||||
};
|
||||
|
||||
this.logger.verbose(`Explore subquery: "filter_by:${config.filter_by}" (count:${count.count})`);
|
||||
|
||||
return from(asset$.search(config)).pipe(
|
||||
catchError((error: any) => {
|
||||
this.logger.warn(`Explore subquery error: ${error}`, error?.stack);
|
||||
return of({ hits: [] });
|
||||
}),
|
||||
map((result) => ({
|
||||
value: count.value,
|
||||
data: result.hits?.[0]?.document as AssetEntity,
|
||||
})),
|
||||
filter((item) => !!item.data),
|
||||
);
|
||||
}, 5),
|
||||
toArray(),
|
||||
map((items) => ({
|
||||
fieldName: facet.field_name as string,
|
||||
items,
|
||||
})),
|
||||
),
|
||||
3,
|
||||
),
|
||||
toArray(),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
async deleteAlbums(ids: string[]): Promise<void> {
|
||||
await this.delete(SearchCollection.ALBUMS, ids);
|
||||
}
|
||||
|
||||
async deleteAssets(ids: string[]): Promise<void> {
|
||||
await this.delete(SearchCollection.ASSETS, ids);
|
||||
}
|
||||
|
||||
async deleteFaces(ids: string[]): Promise<void> {
|
||||
await this.delete(SearchCollection.FACES, ids);
|
||||
}
|
||||
|
||||
async deleteAllFaces(): Promise<number> {
|
||||
const records = await this.client.collections(faceSchema.name).documents().delete({ filter_by: 'ownerId:!=null' });
|
||||
return records.num_deleted;
|
||||
}
|
||||
|
||||
async deleteAllAssets(): Promise<number> {
|
||||
const records = await this.client.collections(assetSchema.name).documents().delete({ filter_by: 'ownerId:!=null' });
|
||||
return records.num_deleted;
|
||||
}
|
||||
|
||||
async updateCLIPField(num_dim: number): Promise<void> {
|
||||
const clipField = assetSchema.fields?.find((field) => field.name === 'smartInfo.clipEmbedding');
|
||||
if (clipField && !this._updateCLIPLock) {
|
||||
try {
|
||||
this._updateCLIPLock = true;
|
||||
clipField.num_dim = num_dim;
|
||||
await this.deleteAllAssets();
|
||||
await this.client
|
||||
.collections(assetSchema.name)
|
||||
.update({ fields: [{ name: 'smartInfo.clipEmbedding', drop: true } as any, clipField] });
|
||||
this.logger.log(`Successfully updated CLIP dimensions to ${num_dim}`);
|
||||
} catch (err: any) {
|
||||
this.logger.error(`Error while updating CLIP field: ${err.message}`);
|
||||
} finally {
|
||||
this._updateCLIPLock = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async delete(collection: SearchCollection, ids: string[]): Promise<void> {
|
||||
await this.client
|
||||
.collections(schemaMap[collection].name)
|
||||
.documents()
|
||||
.delete({ filter_by: this.buildFilterBy('id', ids, true) });
|
||||
}
|
||||
|
||||
async searchAlbums(query: string, filters: SearchFilter): Promise<SearchResult<AlbumEntity>> {
|
||||
const results = await this.client
|
||||
.collections<AlbumEntity>(albumSchema.name)
|
||||
.documents()
|
||||
.search({
|
||||
q: query,
|
||||
query_by: ['albumName', 'description'].join(','),
|
||||
filter_by: this.getAlbumFilters(filters),
|
||||
});
|
||||
|
||||
return this.asResponse(results, filters.debug);
|
||||
}
|
||||
|
||||
async searchAssets(query: string, filters: SearchFilter): Promise<SearchResult<AssetEntity>> {
|
||||
const results = await this.client
|
||||
.collections<AssetEntity>(assetSchema.name)
|
||||
.documents()
|
||||
.search({
|
||||
q: query,
|
||||
query_by: [
|
||||
'originalFileName',
|
||||
'exifInfo.country',
|
||||
'exifInfo.state',
|
||||
'exifInfo.city',
|
||||
'exifInfo.description',
|
||||
'exifInfo.model',
|
||||
'exifInfo.make',
|
||||
'smartInfo.tags',
|
||||
'smartInfo.objects',
|
||||
'people',
|
||||
].join(','),
|
||||
per_page: 250,
|
||||
facet_by: this.getFacetFieldNames(SearchCollection.ASSETS),
|
||||
filter_by: this.getAssetFilters(filters),
|
||||
sort_by: filters.recent ? 'createdAt:desc' : undefined,
|
||||
});
|
||||
|
||||
return this.asResponse(results, filters.debug);
|
||||
}
|
||||
|
||||
async searchFaces(input: number[], filters: SearchFaceFilter): Promise<SearchResult<AssetFaceEntity>> {
|
||||
const { results } = await this.client.multiSearch.perform({
|
||||
searches: [
|
||||
{
|
||||
collection: faceSchema.name,
|
||||
q: '*',
|
||||
vector_query: `embedding:([${input.join(',')}], k:5)`,
|
||||
per_page: 5,
|
||||
filter_by: this.buildFilterBy('ownerId', filters.ownerId, true),
|
||||
} as any,
|
||||
],
|
||||
});
|
||||
|
||||
return this.asResponse(results[0] as SearchResponse<AssetFaceEntity>);
|
||||
}
|
||||
|
||||
async vectorSearch(input: number[], filters: SearchFilter): Promise<SearchResult<AssetEntity>> {
|
||||
const { results } = await this.client.multiSearch.perform({
|
||||
searches: [
|
||||
{
|
||||
collection: assetSchema.name,
|
||||
q: '*',
|
||||
vector_query: `smartInfo.clipEmbedding:([${input.join(',')}], k:100)`,
|
||||
per_page: 100,
|
||||
facet_by: this.getFacetFieldNames(SearchCollection.ASSETS),
|
||||
filter_by: this.getAssetFilters(filters),
|
||||
} as any,
|
||||
],
|
||||
});
|
||||
|
||||
return this.asResponse(results[0] as SearchResponse<AssetEntity>, filters.debug);
|
||||
}
|
||||
|
||||
private asResponse<T extends DocumentSchema>(
|
||||
resultsOrError: SearchResponse<T> | MultiSearchError,
|
||||
debug?: boolean,
|
||||
): SearchResult<T> {
|
||||
const { error, code } = resultsOrError as MultiSearchError;
|
||||
if (error) {
|
||||
throw new Error(`Typesense multi-search error: ${code} - ${error}`);
|
||||
}
|
||||
|
||||
const results = resultsOrError as SearchResponse<T>;
|
||||
|
||||
return {
|
||||
page: results.page,
|
||||
total: results.found,
|
||||
count: results.out_of,
|
||||
items: (results.hits || []).map((hit) => hit.document),
|
||||
distances: (results.hits || []).map((hit: any) => hit.vector_distance),
|
||||
facets: (results.facet_counts || []).map((facet) => ({
|
||||
counts: facet.counts.map((item) => ({ count: item.count, value: item.value })),
|
||||
fieldName: facet.field_name as string,
|
||||
})),
|
||||
debug: debug ? results : undefined,
|
||||
} as SearchResult<T>;
|
||||
}
|
||||
|
||||
private async handleError(error: any) {
|
||||
this.logger.error('Unable to index documents');
|
||||
const results = error.importResults || [];
|
||||
let dimsChanged = false;
|
||||
for (const result of results) {
|
||||
try {
|
||||
result.document = JSON.parse(result.document);
|
||||
if (result.error.includes('Field `smartInfo.clipEmbedding` must have')) {
|
||||
dimsChanged = true;
|
||||
this.logger.warn(
|
||||
`CLIP embedding dimensions have changed, now ${result.document.smartInfo.clipEmbedding.length} dims. Updating schema...`,
|
||||
);
|
||||
await this.updateCLIPField(result.document.smartInfo.clipEmbedding.length);
|
||||
break;
|
||||
}
|
||||
|
||||
if (result.document?.smartInfo?.clipEmbedding) {
|
||||
result.document.smartInfo.clipEmbedding = '<truncated>';
|
||||
}
|
||||
} catch (err: any) {
|
||||
this.logger.error(`Error while updating CLIP field: ${(err.message, err.stack)}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (!dimsChanged) {
|
||||
this.logger.log(JSON.stringify(results, null, 2));
|
||||
}
|
||||
}
|
||||
private async updateAlias(collection: SearchCollection) {
|
||||
const schema = schemaMap[collection];
|
||||
const alias = await this.client
|
||||
.aliases(collection)
|
||||
.retrieve()
|
||||
.catch(() => null);
|
||||
|
||||
// update alias to current collection
|
||||
this.logger.log(`Using new schema: ${alias?.collection_name || '(unset)'} => ${schema.name}`);
|
||||
await this.client.aliases().upsert(collection, { collection_name: schema.name });
|
||||
|
||||
// delete previous collection
|
||||
if (alias && alias.collection_name !== schema.name) {
|
||||
this.logger.log(`Deleting old schema: ${alias.collection_name}`);
|
||||
await this.client.collections(alias.collection_name).delete();
|
||||
}
|
||||
}
|
||||
|
||||
private patch(collection: SearchCollection, items: AssetEntity[] | AlbumEntity[] | OwnedFaceEntity[]) {
|
||||
return items.map((item) => {
|
||||
switch (collection) {
|
||||
case SearchCollection.ASSETS:
|
||||
return this.patchAsset(item as AssetEntity);
|
||||
case SearchCollection.ALBUMS:
|
||||
return this.patchAlbum(item as AlbumEntity);
|
||||
case SearchCollection.FACES:
|
||||
return this.patchFace(item as OwnedFaceEntity);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private patchAlbum(album: AlbumEntity): AlbumEntity {
|
||||
return removeNil(album);
|
||||
}
|
||||
|
||||
private patchAsset(asset: AssetEntity): CustomAssetEntity {
|
||||
let custom = asset as CustomAssetEntity;
|
||||
|
||||
const lat = asset.exifInfo?.latitude;
|
||||
const lng = asset.exifInfo?.longitude;
|
||||
if (lat && lng && lat !== 0 && lng !== 0) {
|
||||
custom = { ...custom, geo: [lat, lng] };
|
||||
}
|
||||
const people = asset.faces
|
||||
?.filter((face) => !face.person?.isHidden && face.person?.name)
|
||||
.map((face) => face.person?.name)
|
||||
.filter((name) => name !== undefined) as string[];
|
||||
if (people.length) {
|
||||
custom = { ...custom, people };
|
||||
}
|
||||
return removeNil({ ...custom, motion: !!asset.livePhotoVideoId });
|
||||
}
|
||||
|
||||
private patchFace(face: OwnedFaceEntity): OwnedFaceEntity {
|
||||
return removeNil(face);
|
||||
}
|
||||
|
||||
private getFacetFieldNames(collection: SearchCollection) {
|
||||
return (schemaMap[collection].fields || [])
|
||||
.filter((field) => field.facet)
|
||||
.map((field) => field.name)
|
||||
.join(',');
|
||||
}
|
||||
|
||||
private getAlbumFilters(filters: SearchFilter) {
|
||||
const { userId } = filters;
|
||||
|
||||
const _filters = [this.buildFilterBy('ownerId', userId, true)];
|
||||
|
||||
if (filters.id) {
|
||||
_filters.push(this.buildFilterBy('id', filters.id, true));
|
||||
}
|
||||
|
||||
for (const item of albumSchema.fields || []) {
|
||||
const value = filters[item.name as keyof SearchFilter];
|
||||
if (item.facet && value !== undefined) {
|
||||
_filters.push(this.buildFilterBy(item.name, value));
|
||||
}
|
||||
}
|
||||
|
||||
const result = _filters.join(' && ');
|
||||
|
||||
this.logger.debug(`Album filters are: ${result}`);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private getAssetFilters(filters: SearchFilter) {
|
||||
const { userId } = filters;
|
||||
const _filters = [this.buildFilterBy('ownerId', userId, true), this.buildFilterBy('isArchived', false)];
|
||||
|
||||
if (filters.id) {
|
||||
_filters.push(this.buildFilterBy('id', filters.id, true));
|
||||
}
|
||||
|
||||
for (const item of assetSchema.fields || []) {
|
||||
const value = filters[item.name as keyof SearchFilter];
|
||||
if (item.facet && value !== undefined) {
|
||||
_filters.push(this.buildFilterBy(item.name, value));
|
||||
}
|
||||
}
|
||||
|
||||
const result = _filters.join(' && ');
|
||||
|
||||
this.logger.debug(`Asset filters are: ${result}`);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private buildFilterBy(key: string, values: boolean | string | string[], exact?: boolean) {
|
||||
const token = exact ? ':=' : ':';
|
||||
|
||||
const _values = (Array.isArray(values) ? values : [values]).map((value) => {
|
||||
if (typeof value === 'boolean' || value === 'true' || value === 'false') {
|
||||
return value;
|
||||
}
|
||||
return '`' + value + '`';
|
||||
});
|
||||
|
||||
const value = _values.length > 1 ? `[${_values.join(',')}]` : _values[0];
|
||||
|
||||
return `${key}${token}${value}`;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user