feat(server): efficient full app sync (#8755)

* feat(server): efficient full app sync

* add SQL, fix test compile issues

* fix linter warning

* new sync controller+service, add tests

* enable new sync controller+service

* Update server/src/services/sync.service.ts

Co-authored-by: Daniel Dietzler <36593685+danieldietzler@users.noreply.github.com>

---------

Co-authored-by: Daniel Dietzler <36593685+danieldietzler@users.noreply.github.com>
This commit is contained in:
Fynn Petersen-Frey
2024-04-16 07:26:37 +02:00
committed by GitHub
parent 58e516c766
commit 103cb60a57
26 changed files with 1178 additions and 13 deletions
+3 -3
View File
@@ -61,13 +61,13 @@ describe(AuditService.name, () => {
expect(auditMock.getAfter).toHaveBeenCalledWith(date, {
action: DatabaseAction.DELETE,
ownerId: authStub.admin.user.id,
userIds: [authStub.admin.user.id],
entityType: EntityType.ASSET,
});
});
it('should get any new or updated assets and deleted ids', async () => {
auditMock.getAfter.mockResolvedValue([auditStub.delete]);
auditMock.getAfter.mockResolvedValue([auditStub.delete.entityId]);
const date = new Date();
await expect(sut.getDeletes(authStub.admin, { after: date, entityType: EntityType.ASSET })).resolves.toEqual({
@@ -77,7 +77,7 @@ describe(AuditService.name, () => {
expect(auditMock.getAfter).toHaveBeenCalledWith(date, {
action: DatabaseAction.DELETE,
ownerId: authStub.admin.user.id,
userIds: [authStub.admin.user.id],
entityType: EntityType.ASSET,
});
});
+2 -2
View File
@@ -53,7 +53,7 @@ export class AuditService {
await this.access.requirePermission(auth, Permission.TIMELINE_READ, userId);
const audits = await this.repository.getAfter(dto.after, {
ownerId: userId,
userIds: [userId],
entityType: dto.entityType,
action: DatabaseAction.DELETE,
});
@@ -62,7 +62,7 @@ export class AuditService {
return {
needsFullSync: duration > AUDIT_LOG_MAX_DURATION,
ids: audits.map(({ entityId }) => entityId),
ids: audits,
};
}
+2
View File
@@ -22,6 +22,7 @@ import { SharedLinkService } from 'src/services/shared-link.service';
import { SmartInfoService } from 'src/services/smart-info.service';
import { StorageTemplateService } from 'src/services/storage-template.service';
import { StorageService } from 'src/services/storage.service';
import { SyncService } from 'src/services/sync.service';
import { SystemConfigService } from 'src/services/system-config.service';
import { TagService } from 'src/services/tag.service';
import { TimelineService } from 'src/services/timeline.service';
@@ -53,6 +54,7 @@ export const services = [
SmartInfoService,
StorageService,
StorageTemplateService,
SyncService,
SystemConfigService,
TagService,
TimelineService,
+101
View File
@@ -0,0 +1,101 @@
import { mapAsset } from 'src/dtos/asset-response.dto';
import { AssetEntity } from 'src/entities/asset.entity';
import { IAccessRepository } from 'src/interfaces/access.interface';
import { IAssetRepository } from 'src/interfaces/asset.interface';
import { IAuditRepository } from 'src/interfaces/audit.interface';
import { IPartnerRepository } from 'src/interfaces/partner.interface';
import { SyncService } from 'src/services/sync.service';
import { assetStub } from 'test/fixtures/asset.stub';
import { authStub } from 'test/fixtures/auth.stub';
import { partnerStub } from 'test/fixtures/partner.stub';
import { newAccessRepositoryMock } from 'test/repositories/access.repository.mock';
import { newAssetRepositoryMock } from 'test/repositories/asset.repository.mock';
import { newAuditRepositoryMock } from 'test/repositories/audit.repository.mock';
import { newPartnerRepositoryMock } from 'test/repositories/partner.repository.mock';
const untilDate = new Date(2024);
const mapAssetOpts = { auth: authStub.user1, stripMetadata: false, withStack: true };
describe(SyncService.name, () => {
let sut: SyncService;
let accessMock: jest.Mocked<IAccessRepository>;
let assetMock: jest.Mocked<IAssetRepository>;
let partnerMock: jest.Mocked<IPartnerRepository>;
let auditMock: jest.Mocked<IAuditRepository>;
beforeEach(() => {
partnerMock = newPartnerRepositoryMock();
assetMock = newAssetRepositoryMock();
accessMock = newAccessRepositoryMock();
auditMock = newAuditRepositoryMock();
sut = new SyncService(accessMock, assetMock, partnerMock, auditMock);
});
it('should exist', () => {
expect(sut).toBeDefined();
});
describe('getAllAssetsForUserFullSync', () => {
it('should return a list of all assets owned by the user', async () => {
assetMock.getAllForUserFullSync.mockResolvedValue([assetStub.external, assetStub.hasEncodedVideo]);
await expect(
sut.getAllAssetsForUserFullSync(authStub.user1, { limit: 2, updatedUntil: untilDate }),
).resolves.toEqual([
mapAsset(assetStub.external, mapAssetOpts),
mapAsset(assetStub.hasEncodedVideo, mapAssetOpts),
]);
expect(assetMock.getAllForUserFullSync).toHaveBeenCalledWith({
ownerId: authStub.user1.user.id,
updatedUntil: untilDate,
limit: 2,
});
});
});
describe('getChangesForDeltaSync', () => {
it('should return a response requiring a full sync when partners are out of sync', async () => {
partnerMock.getAll.mockResolvedValue([partnerStub.adminToUser1]);
await expect(
sut.getChangesForDeltaSync(authStub.user1, { updatedAfter: new Date(), userIds: [authStub.user1.user.id] }),
).resolves.toEqual({ needsFullSync: true, upserted: [], deleted: [] });
expect(assetMock.getChangedDeltaSync).toHaveBeenCalledTimes(0);
expect(auditMock.getAfter).toHaveBeenCalledTimes(0);
});
it('should return a response requiring a full sync when last sync was too long ago', async () => {
partnerMock.getAll.mockResolvedValue([]);
await expect(
sut.getChangesForDeltaSync(authStub.user1, { updatedAfter: new Date(2000), userIds: [authStub.user1.user.id] }),
).resolves.toEqual({ needsFullSync: true, upserted: [], deleted: [] });
expect(assetMock.getChangedDeltaSync).toHaveBeenCalledTimes(0);
expect(auditMock.getAfter).toHaveBeenCalledTimes(0);
});
it('should return a response requiring a full sync when there are too many changes', async () => {
partnerMock.getAll.mockResolvedValue([]);
assetMock.getChangedDeltaSync.mockResolvedValue(
Array.from<AssetEntity>({ length: 10_000 }).fill(assetStub.image),
);
await expect(
sut.getChangesForDeltaSync(authStub.user1, { updatedAfter: new Date(), userIds: [authStub.user1.user.id] }),
).resolves.toEqual({ needsFullSync: true, upserted: [], deleted: [] });
expect(assetMock.getChangedDeltaSync).toHaveBeenCalledTimes(1);
expect(auditMock.getAfter).toHaveBeenCalledTimes(0);
});
it('should return a response with changes and deletions', async () => {
partnerMock.getAll.mockResolvedValue([]);
assetMock.getChangedDeltaSync.mockResolvedValue([assetStub.image1]);
auditMock.getAfter.mockResolvedValue([assetStub.external.id]);
await expect(
sut.getChangesForDeltaSync(authStub.user1, { updatedAfter: new Date(), userIds: [authStub.user1.user.id] }),
).resolves.toEqual({
needsFullSync: false,
upserted: [mapAsset(assetStub.image1, mapAssetOpts)],
deleted: [assetStub.external.id],
});
expect(assetMock.getChangedDeltaSync).toHaveBeenCalledTimes(1);
expect(auditMock.getAfter).toHaveBeenCalledTimes(1);
});
});
});
+77
View File
@@ -0,0 +1,77 @@
import { Inject } from '@nestjs/common';
import _ from 'lodash';
import { DateTime } from 'luxon';
import { AUDIT_LOG_MAX_DURATION } from 'src/constants';
import { AccessCore, Permission } from 'src/cores/access.core';
import { AssetResponseDto, mapAsset } from 'src/dtos/asset-response.dto';
import { AuthDto } from 'src/dtos/auth.dto';
import { AssetDeltaSyncDto, AssetDeltaSyncResponseDto, AssetFullSyncDto } from 'src/dtos/sync.dto';
import { DatabaseAction, EntityType } from 'src/entities/audit.entity';
import { IAccessRepository } from 'src/interfaces/access.interface';
import { IAssetRepository } from 'src/interfaces/asset.interface';
import { IAuditRepository } from 'src/interfaces/audit.interface';
import { IPartnerRepository } from 'src/interfaces/partner.interface';
export class SyncService {
private access: AccessCore;
constructor(
@Inject(IAccessRepository) accessRepository: IAccessRepository,
@Inject(IAssetRepository) private assetRepository: IAssetRepository,
@Inject(IPartnerRepository) private partnerRepository: IPartnerRepository,
@Inject(IAuditRepository) private auditRepository: IAuditRepository,
) {
this.access = AccessCore.create(accessRepository);
}
async getAllAssetsForUserFullSync(auth: AuthDto, dto: AssetFullSyncDto): Promise<AssetResponseDto[]> {
const userId = dto.userId || auth.user.id;
await this.access.requirePermission(auth, Permission.TIMELINE_READ, userId);
const assets = await this.assetRepository.getAllForUserFullSync({
ownerId: userId,
lastCreationDate: dto.lastCreationDate,
updatedUntil: dto.updatedUntil,
lastId: dto.lastId,
limit: dto.limit,
});
const options = { auth, stripMetadata: false, withStack: true };
return assets.map((a) => mapAsset(a, options));
}
async getChangesForDeltaSync(auth: AuthDto, dto: AssetDeltaSyncDto): Promise<AssetDeltaSyncResponseDto> {
await this.access.requirePermission(auth, Permission.TIMELINE_READ, dto.userIds);
const partner = await this.partnerRepository.getAll(auth.user.id);
const userIds = [auth.user.id, ...partner.filter((p) => p.sharedWithId == auth.user.id).map((p) => p.sharedById)];
userIds.sort();
dto.userIds.sort();
const duration = DateTime.now().diff(DateTime.fromJSDate(dto.updatedAfter));
if (!_.isEqual(userIds, dto.userIds) || duration > AUDIT_LOG_MAX_DURATION) {
// app does not have the correct partners synced
// or app has not synced in the last 100 days
return { needsFullSync: true, deleted: [], upserted: [] };
}
const limit = 10_000;
const upserted = await this.assetRepository.getChangedDeltaSync({ limit, updatedAfter: dto.updatedAfter, userIds });
if (upserted.length === limit) {
// too many changes -> do a full sync (paginated) instead
return { needsFullSync: true, deleted: [], upserted: [] };
}
const deleted = await this.auditRepository.getAfter(dto.updatedAfter, {
userIds: userIds,
entityType: EntityType.ASSET,
action: DatabaseAction.DELETE,
});
const options = { auth, stripMetadata: false, withStack: true };
const result = {
needsFullSync: false,
upserted: upserted.map((a) => mapAsset(a, options)),
deleted,
};
return result;
}
}