feat(server): near-duplicate detection (#8228)
* duplicate detection job, entity, config * queueing * job panel, update api * use embedding in db instead of fetching * disable concurrency * only queue visible assets * handle multiple duplicateIds * update concurrent queue check * add provider * add web placeholder, server endpoint, migration, various fixes * update sql * select embedding by default * rename variable * simplify * remove separate entity, handle re-running with different threshold, set default back to 0.02 * fix tests * add tests * add index to entity * formatting * update asset mock * fix `upsertJobStatus` signature * update sql * formatting * default to 0.03 * optimize clustering * use asset's `duplicateId` if present * update sql * update tests * expose admin setting * refactor * formatting * skip if ml is disabled * debug trash e2e * remove from web * remove from sidebar * test if ml is disabled * update sql * separate duplicate detection from clip in config, disable by default for now * fix doc * lower minimum `maxDistance` * update api * Add and Use Duplicate Detection Feature Flag (#9364) * Add Duplicate Detection Flag * Use Duplicate Detection Flag * Attempt Fixes for Failing Checks * lower minimum `maxDistance` * fix tests --------- Co-authored-by: mertalev <101130780+mertalev@users.noreply.github.com> * chore: fixes and additions after rebase * chore: update api (remove new Role enum) * fix: left join smart search so getAll works without machine learning * test: trash e2e go back to checking length of assets is zero * chore: regen api after rebase * test: fix tests after rebase * redundant join --------- Co-authored-by: Nicholas Flamy <30300649+NicholasFlamy@users.noreply.github.com> Co-authored-by: Zack Pollard <zackpollard@ymail.com> Co-authored-by: Zack Pollard <zack@futo.org>
This commit is contained in:
@@ -16,15 +16,25 @@ import {
|
||||
} from 'src/dtos/search.dto';
|
||||
import { AssetOrder } from 'src/entities/album.entity';
|
||||
import { AssetEntity } from 'src/entities/asset.entity';
|
||||
import { IAssetRepository } from 'src/interfaces/asset.interface';
|
||||
import { IAssetRepository, WithoutProperty } from 'src/interfaces/asset.interface';
|
||||
import { ICryptoRepository } from 'src/interfaces/crypto.interface';
|
||||
import {
|
||||
IBaseJob,
|
||||
IEntityJob,
|
||||
IJobRepository,
|
||||
JOBS_ASSET_PAGINATION_SIZE,
|
||||
JobName,
|
||||
JobStatus,
|
||||
} from 'src/interfaces/job.interface';
|
||||
import { ILoggerRepository } from 'src/interfaces/logger.interface';
|
||||
import { IMachineLearningRepository } from 'src/interfaces/machine-learning.interface';
|
||||
import { IMetadataRepository } from 'src/interfaces/metadata.interface';
|
||||
import { IPartnerRepository } from 'src/interfaces/partner.interface';
|
||||
import { IPersonRepository } from 'src/interfaces/person.interface';
|
||||
import { ISearchRepository, SearchExploreItem } from 'src/interfaces/search.interface';
|
||||
import { AssetDuplicateResult, ISearchRepository, SearchExploreItem } from 'src/interfaces/search.interface';
|
||||
import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface';
|
||||
import { isSmartSearchEnabled } from 'src/utils/misc';
|
||||
import { isDuplicateDetectionEnabled, isSmartSearchEnabled } from 'src/utils/misc';
|
||||
import { usePagination } from 'src/utils/pagination';
|
||||
|
||||
@Injectable()
|
||||
export class SearchService {
|
||||
@@ -39,6 +49,8 @@ export class SearchService {
|
||||
@Inject(IPartnerRepository) private partnerRepository: IPartnerRepository,
|
||||
@Inject(IMetadataRepository) private metadataRepository: IMetadataRepository,
|
||||
@Inject(ILoggerRepository) private logger: ILoggerRepository,
|
||||
@Inject(ICryptoRepository) private cryptoRepository: ICryptoRepository,
|
||||
@Inject(IJobRepository) private jobRepository: IJobRepository,
|
||||
) {
|
||||
this.logger.setContext(SearchService.name);
|
||||
this.configCore = SystemConfigCore.create(systemMetadataRepository, logger);
|
||||
@@ -147,6 +159,97 @@ export class SearchService {
|
||||
}
|
||||
}
|
||||
|
||||
async handleQueueSearchDuplicates({ force }: IBaseJob): Promise<JobStatus> {
|
||||
const { machineLearning } = await this.configCore.getConfig();
|
||||
if (!isDuplicateDetectionEnabled(machineLearning)) {
|
||||
return JobStatus.SKIPPED;
|
||||
}
|
||||
|
||||
const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => {
|
||||
return force
|
||||
? this.assetRepository.getAll(pagination, { isVisible: true })
|
||||
: this.assetRepository.getWithout(pagination, WithoutProperty.DUPLICATE);
|
||||
});
|
||||
|
||||
for await (const assets of assetPagination) {
|
||||
await this.jobRepository.queueAll(
|
||||
assets.map((asset) => ({ name: JobName.DUPLICATE_DETECTION, data: { id: asset.id } })),
|
||||
);
|
||||
}
|
||||
|
||||
return JobStatus.SUCCESS;
|
||||
}
|
||||
|
||||
async handleSearchDuplicates({ id }: IEntityJob): Promise<JobStatus> {
|
||||
const { machineLearning } = await this.configCore.getConfig();
|
||||
if (!isDuplicateDetectionEnabled(machineLearning)) {
|
||||
return JobStatus.SKIPPED;
|
||||
}
|
||||
|
||||
const asset = await this.assetRepository.getById(id, { smartSearch: true });
|
||||
if (!asset) {
|
||||
this.logger.error(`Asset ${id} not found`);
|
||||
return JobStatus.FAILED;
|
||||
}
|
||||
|
||||
if (!asset.isVisible) {
|
||||
this.logger.debug(`Asset ${id} is not visible, skipping`);
|
||||
return JobStatus.SKIPPED;
|
||||
}
|
||||
|
||||
if (!asset.previewPath) {
|
||||
this.logger.warn(`Asset ${id} is missing preview image`);
|
||||
return JobStatus.FAILED;
|
||||
}
|
||||
|
||||
if (!asset.smartSearch?.embedding) {
|
||||
this.logger.debug(`Asset ${id} is missing embedding`);
|
||||
return JobStatus.FAILED;
|
||||
}
|
||||
|
||||
const duplicateAssets = await this.searchRepository.searchDuplicates({
|
||||
assetId: asset.id,
|
||||
embedding: asset.smartSearch.embedding,
|
||||
maxDistance: machineLearning.duplicateDetection.maxDistance,
|
||||
userIds: [asset.ownerId],
|
||||
});
|
||||
|
||||
let assetIds = [asset.id];
|
||||
if (duplicateAssets.length > 0) {
|
||||
this.logger.debug(
|
||||
`Found ${duplicateAssets.length} duplicate${duplicateAssets.length === 1 ? '' : 's'} for asset ${asset.id}`,
|
||||
);
|
||||
assetIds = await this.updateDuplicates(asset, duplicateAssets);
|
||||
} else if (asset.duplicateId) {
|
||||
this.logger.debug(`No duplicates found for asset ${asset.id}, removing duplicateId`);
|
||||
await this.assetRepository.update({ id: asset.id, duplicateId: null });
|
||||
}
|
||||
|
||||
const duplicatesDetectedAt = new Date();
|
||||
await this.assetRepository.upsertJobStatus(...assetIds.map((assetId) => ({ assetId, duplicatesDetectedAt })));
|
||||
|
||||
return JobStatus.SUCCESS;
|
||||
}
|
||||
|
||||
private async updateDuplicates(asset: AssetEntity, duplicateAssets: AssetDuplicateResult[]): Promise<string[]> {
|
||||
const duplicateIds = [
|
||||
...new Set(
|
||||
duplicateAssets
|
||||
.filter((asset): asset is AssetDuplicateResult & { duplicateId: string } => !!asset.duplicateId)
|
||||
.map((duplicate) => duplicate.duplicateId),
|
||||
),
|
||||
];
|
||||
|
||||
const targetDuplicateId = asset.duplicateId ?? duplicateIds.shift() ?? this.cryptoRepository.randomUUID();
|
||||
const assetIdsToUpdate = duplicateAssets
|
||||
.filter((asset) => asset.duplicateId !== targetDuplicateId)
|
||||
.map((duplicate) => duplicate.assetId);
|
||||
assetIdsToUpdate.push(asset.id);
|
||||
|
||||
await this.assetRepository.updateDuplicates({ targetDuplicateId, assetIds: assetIdsToUpdate, duplicateIds });
|
||||
return assetIdsToUpdate;
|
||||
}
|
||||
|
||||
private async getUserIdsToSearch(auth: AuthDto): Promise<string[]> {
|
||||
const userIds: string[] = [auth.user.id];
|
||||
const partners = await this.partnerRepository.getAll(auth.user.id);
|
||||
|
||||
Reference in New Issue
Block a user