refactor: new helper methods that work for all sync queries (#20690)
refactor: new helper methods that work for all sync queries
This commit is contained in:
@@ -183,7 +183,7 @@ export class SyncService extends BaseService {
|
||||
|
||||
private async syncAuthUsersV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
|
||||
const upsertType = SyncEntityType.AuthUserV1;
|
||||
const upserts = this.syncRepository.authUser.getUpserts(options, checkpointMap[upsertType]);
|
||||
const upserts = this.syncRepository.authUser.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
for await (const { updateId, profileImagePath, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data: { ...data, hasProfileImage: !!profileImagePath } });
|
||||
}
|
||||
@@ -191,13 +191,13 @@ export class SyncService extends BaseService {
|
||||
|
||||
private async syncUsersV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
|
||||
const deleteType = SyncEntityType.UserDeleteV1;
|
||||
const deletes = this.syncRepository.user.getDeletes(options, checkpointMap[deleteType]);
|
||||
const deletes = this.syncRepository.user.getDeletes({ ...options, ack: checkpointMap[deleteType] });
|
||||
for await (const { id, ...data } of deletes) {
|
||||
send(response, { type: deleteType, ids: [id], data });
|
||||
}
|
||||
|
||||
const upsertType = SyncEntityType.UserV1;
|
||||
const upserts = this.syncRepository.user.getUpserts(options, checkpointMap[upsertType]);
|
||||
const upserts = this.syncRepository.user.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
for await (const { updateId, profileImagePath, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data: { ...data, hasProfileImage: !!profileImagePath } });
|
||||
}
|
||||
@@ -205,13 +205,13 @@ export class SyncService extends BaseService {
|
||||
|
||||
private async syncPartnersV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
|
||||
const deleteType = SyncEntityType.PartnerDeleteV1;
|
||||
const deletes = this.syncRepository.partner.getDeletes(options, checkpointMap[deleteType]);
|
||||
const deletes = this.syncRepository.partner.getDeletes({ ...options, ack: checkpointMap[deleteType] });
|
||||
for await (const { id, ...data } of deletes) {
|
||||
send(response, { type: deleteType, ids: [id], data });
|
||||
}
|
||||
|
||||
const upsertType = SyncEntityType.PartnerV1;
|
||||
const upserts = this.syncRepository.partner.getUpserts(options, checkpointMap[upsertType]);
|
||||
const upserts = this.syncRepository.partner.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data });
|
||||
}
|
||||
@@ -219,13 +219,13 @@ export class SyncService extends BaseService {
|
||||
|
||||
private async syncAssetsV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
|
||||
const deleteType = SyncEntityType.AssetDeleteV1;
|
||||
const deletes = this.syncRepository.asset.getDeletes(options, checkpointMap[deleteType]);
|
||||
const deletes = this.syncRepository.asset.getDeletes({ ...options, ack: checkpointMap[deleteType] });
|
||||
for await (const { id, ...data } of deletes) {
|
||||
send(response, { type: deleteType, ids: [id], data });
|
||||
}
|
||||
|
||||
const upsertType = SyncEntityType.AssetV1;
|
||||
const upserts = this.syncRepository.asset.getUpserts(options, checkpointMap[upsertType]);
|
||||
const upserts = this.syncRepository.asset.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data: mapSyncAssetV1(data) });
|
||||
}
|
||||
@@ -238,14 +238,17 @@ export class SyncService extends BaseService {
|
||||
sessionId: string,
|
||||
) {
|
||||
const deleteType = SyncEntityType.PartnerAssetDeleteV1;
|
||||
const deletes = this.syncRepository.partnerAsset.getDeletes(options, checkpointMap[deleteType]);
|
||||
const deletes = this.syncRepository.partnerAsset.getDeletes({ ...options, ack: checkpointMap[deleteType] });
|
||||
for await (const { id, ...data } of deletes) {
|
||||
send(response, { type: deleteType, ids: [id], data });
|
||||
}
|
||||
|
||||
const backfillType = SyncEntityType.PartnerAssetBackfillV1;
|
||||
const backfillCheckpoint = checkpointMap[backfillType];
|
||||
const partners = await this.syncRepository.partner.getCreatedAfter(options, backfillCheckpoint?.updateId);
|
||||
const partners = await this.syncRepository.partner.getCreatedAfter({
|
||||
...options,
|
||||
afterCreateId: backfillCheckpoint?.updateId,
|
||||
});
|
||||
const upsertType = SyncEntityType.PartnerAssetV1;
|
||||
const upsertCheckpoint = checkpointMap[upsertType];
|
||||
if (upsertCheckpoint) {
|
||||
@@ -258,7 +261,10 @@ export class SyncService extends BaseService {
|
||||
}
|
||||
|
||||
const startId = getStartId(createId, backfillCheckpoint);
|
||||
const backfill = this.syncRepository.partnerAsset.getBackfill(options, partner.sharedById, startId, endId);
|
||||
const backfill = this.syncRepository.partnerAsset.getBackfill(
|
||||
{ ...options, afterUpdateId: startId, beforeUpdateId: endId },
|
||||
partner.sharedById,
|
||||
);
|
||||
|
||||
for await (const { updateId, ...data } of backfill) {
|
||||
send(response, {
|
||||
@@ -278,7 +284,7 @@ export class SyncService extends BaseService {
|
||||
});
|
||||
}
|
||||
|
||||
const upserts = this.syncRepository.partnerAsset.getUpserts(options, checkpointMap[upsertType]);
|
||||
const upserts = this.syncRepository.partnerAsset.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data: mapSyncAssetV1(data) });
|
||||
}
|
||||
@@ -286,7 +292,7 @@ export class SyncService extends BaseService {
|
||||
|
||||
private async syncAssetExifsV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
|
||||
const upsertType = SyncEntityType.AssetExifV1;
|
||||
const upserts = this.syncRepository.assetExif.getUpserts(options, checkpointMap[upsertType]);
|
||||
const upserts = this.syncRepository.assetExif.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data });
|
||||
}
|
||||
@@ -300,7 +306,10 @@ export class SyncService extends BaseService {
|
||||
) {
|
||||
const backfillType = SyncEntityType.PartnerAssetExifBackfillV1;
|
||||
const backfillCheckpoint = checkpointMap[backfillType];
|
||||
const partners = await this.syncRepository.partner.getCreatedAfter(options, backfillCheckpoint?.updateId);
|
||||
const partners = await this.syncRepository.partner.getCreatedAfter({
|
||||
...options,
|
||||
afterCreateId: backfillCheckpoint?.updateId,
|
||||
});
|
||||
|
||||
const upsertType = SyncEntityType.PartnerAssetExifV1;
|
||||
const upsertCheckpoint = checkpointMap[upsertType];
|
||||
@@ -314,7 +323,10 @@ export class SyncService extends BaseService {
|
||||
}
|
||||
|
||||
const startId = getStartId(createId, backfillCheckpoint);
|
||||
const backfill = this.syncRepository.partnerAssetExif.getBackfill(options, partner.sharedById, startId, endId);
|
||||
const backfill = this.syncRepository.partnerAssetExif.getBackfill(
|
||||
{ ...options, afterUpdateId: startId, beforeUpdateId: endId },
|
||||
partner.sharedById,
|
||||
);
|
||||
|
||||
for await (const { updateId, ...data } of backfill) {
|
||||
send(response, { type: backfillType, ids: [partner.createId, updateId], data });
|
||||
@@ -330,7 +342,7 @@ export class SyncService extends BaseService {
|
||||
});
|
||||
}
|
||||
|
||||
const upserts = this.syncRepository.partnerAssetExif.getUpserts(options, checkpointMap[upsertType]);
|
||||
const upserts = this.syncRepository.partnerAssetExif.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data });
|
||||
}
|
||||
@@ -338,13 +350,13 @@ export class SyncService extends BaseService {
|
||||
|
||||
private async syncAlbumsV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
|
||||
const deleteType = SyncEntityType.AlbumDeleteV1;
|
||||
const deletes = this.syncRepository.album.getDeletes(options, checkpointMap[deleteType]);
|
||||
const deletes = this.syncRepository.album.getDeletes({ ...options, ack: checkpointMap[deleteType] });
|
||||
for await (const { id, ...data } of deletes) {
|
||||
send(response, { type: deleteType, ids: [id], data });
|
||||
}
|
||||
|
||||
const upsertType = SyncEntityType.AlbumV1;
|
||||
const upserts = this.syncRepository.album.getUpserts(options, checkpointMap[upsertType]);
|
||||
const upserts = this.syncRepository.album.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data });
|
||||
}
|
||||
@@ -357,14 +369,17 @@ export class SyncService extends BaseService {
|
||||
sessionId: string,
|
||||
) {
|
||||
const deleteType = SyncEntityType.AlbumUserDeleteV1;
|
||||
const deletes = this.syncRepository.albumUser.getDeletes(options, checkpointMap[deleteType]);
|
||||
const deletes = this.syncRepository.albumUser.getDeletes({ ...options, ack: checkpointMap[deleteType] });
|
||||
for await (const { id, ...data } of deletes) {
|
||||
send(response, { type: deleteType, ids: [id], data });
|
||||
}
|
||||
|
||||
const backfillType = SyncEntityType.AlbumUserBackfillV1;
|
||||
const backfillCheckpoint = checkpointMap[backfillType];
|
||||
const albums = await this.syncRepository.album.getCreatedAfter(options, backfillCheckpoint?.updateId);
|
||||
const albums = await this.syncRepository.album.getCreatedAfter({
|
||||
...options,
|
||||
afterCreateId: backfillCheckpoint?.updateId,
|
||||
});
|
||||
const upsertType = SyncEntityType.AlbumUserV1;
|
||||
const upsertCheckpoint = checkpointMap[upsertType];
|
||||
if (upsertCheckpoint) {
|
||||
@@ -377,7 +392,10 @@ export class SyncService extends BaseService {
|
||||
}
|
||||
|
||||
const startId = getStartId(createId, backfillCheckpoint);
|
||||
const backfill = this.syncRepository.albumUser.getBackfill(options, album.id, startId, endId);
|
||||
const backfill = this.syncRepository.albumUser.getBackfill(
|
||||
{ ...options, afterUpdateId: startId, beforeUpdateId: endId },
|
||||
album.id,
|
||||
);
|
||||
|
||||
for await (const { updateId, ...data } of backfill) {
|
||||
send(response, { type: backfillType, ids: [createId, updateId], data });
|
||||
@@ -393,7 +411,7 @@ export class SyncService extends BaseService {
|
||||
});
|
||||
}
|
||||
|
||||
const upserts = this.syncRepository.albumUser.getUpserts(options, checkpointMap[upsertType]);
|
||||
const upserts = this.syncRepository.albumUser.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data });
|
||||
}
|
||||
@@ -407,7 +425,10 @@ export class SyncService extends BaseService {
|
||||
) {
|
||||
const backfillType = SyncEntityType.AlbumAssetBackfillV1;
|
||||
const backfillCheckpoint = checkpointMap[backfillType];
|
||||
const albums = await this.syncRepository.album.getCreatedAfter(options, backfillCheckpoint?.updateId);
|
||||
const albums = await this.syncRepository.album.getCreatedAfter({
|
||||
...options,
|
||||
afterCreateId: backfillCheckpoint?.updateId,
|
||||
});
|
||||
const updateType = SyncEntityType.AlbumAssetUpdateV1;
|
||||
const createType = SyncEntityType.AlbumAssetCreateV1;
|
||||
const updateCheckpoint = checkpointMap[updateType];
|
||||
@@ -422,7 +443,10 @@ export class SyncService extends BaseService {
|
||||
}
|
||||
|
||||
const startId = getStartId(createId, backfillCheckpoint);
|
||||
const backfill = this.syncRepository.albumAsset.getBackfill(options, album.id, startId, endId);
|
||||
const backfill = this.syncRepository.albumAsset.getBackfill(
|
||||
{ ...options, afterUpdateId: startId, beforeUpdateId: endId },
|
||||
album.id,
|
||||
);
|
||||
|
||||
for await (const { updateId, ...data } of backfill) {
|
||||
send(response, { type: backfillType, ids: [createId, updateId], data: mapSyncAssetV1(data) });
|
||||
@@ -439,13 +463,16 @@ export class SyncService extends BaseService {
|
||||
}
|
||||
|
||||
if (createCheckpoint) {
|
||||
const updates = this.syncRepository.albumAsset.getUpdates(options, createCheckpoint, updateCheckpoint);
|
||||
const updates = this.syncRepository.albumAsset.getUpdates(
|
||||
{ ...options, ack: updateCheckpoint },
|
||||
createCheckpoint,
|
||||
);
|
||||
for await (const { updateId, ...data } of updates) {
|
||||
send(response, { type: updateType, ids: [updateId], data: mapSyncAssetV1(data) });
|
||||
}
|
||||
}
|
||||
|
||||
const creates = this.syncRepository.albumAsset.getCreates(options, createCheckpoint);
|
||||
const creates = this.syncRepository.albumAsset.getCreates({ ...options, ack: createCheckpoint });
|
||||
let first = true;
|
||||
for await (const { updateId, ...data } of creates) {
|
||||
if (first) {
|
||||
@@ -469,7 +496,10 @@ export class SyncService extends BaseService {
|
||||
) {
|
||||
const backfillType = SyncEntityType.AlbumAssetExifBackfillV1;
|
||||
const backfillCheckpoint = checkpointMap[backfillType];
|
||||
const albums = await this.syncRepository.album.getCreatedAfter(options, backfillCheckpoint?.updateId);
|
||||
const albums = await this.syncRepository.album.getCreatedAfter({
|
||||
...options,
|
||||
afterCreateId: backfillCheckpoint?.updateId,
|
||||
});
|
||||
const updateType = SyncEntityType.AlbumAssetExifUpdateV1;
|
||||
const createType = SyncEntityType.AlbumAssetExifCreateV1;
|
||||
const upsertCheckpoint = checkpointMap[updateType];
|
||||
@@ -484,7 +514,10 @@ export class SyncService extends BaseService {
|
||||
}
|
||||
|
||||
const startId = getStartId(createId, backfillCheckpoint);
|
||||
const backfill = this.syncRepository.albumAssetExif.getBackfill(options, album.id, startId, endId);
|
||||
const backfill = this.syncRepository.albumAssetExif.getBackfill(
|
||||
{ ...options, afterUpdateId: startId, beforeUpdateId: endId },
|
||||
album.id,
|
||||
);
|
||||
|
||||
for await (const { updateId, ...data } of backfill) {
|
||||
send(response, { type: backfillType, ids: [createId, updateId], data });
|
||||
@@ -501,13 +534,16 @@ export class SyncService extends BaseService {
|
||||
}
|
||||
|
||||
if (createCheckpoint) {
|
||||
const updates = this.syncRepository.albumAssetExif.getUpdates(options, createCheckpoint, upsertCheckpoint);
|
||||
const updates = this.syncRepository.albumAssetExif.getUpdates(
|
||||
{ ...options, ack: upsertCheckpoint },
|
||||
createCheckpoint,
|
||||
);
|
||||
for await (const { updateId, ...data } of updates) {
|
||||
send(response, { type: updateType, ids: [updateId], data });
|
||||
}
|
||||
}
|
||||
|
||||
const creates = this.syncRepository.albumAssetExif.getCreates(options, createCheckpoint);
|
||||
const creates = this.syncRepository.albumAssetExif.getCreates({ ...options, ack: createCheckpoint });
|
||||
let first = true;
|
||||
for await (const { updateId, ...data } of creates) {
|
||||
if (first) {
|
||||
@@ -530,14 +566,17 @@ export class SyncService extends BaseService {
|
||||
sessionId: string,
|
||||
) {
|
||||
const deleteType = SyncEntityType.AlbumToAssetDeleteV1;
|
||||
const deletes = this.syncRepository.albumToAsset.getDeletes(options, checkpointMap[deleteType]);
|
||||
const deletes = this.syncRepository.albumToAsset.getDeletes({ ...options, ack: checkpointMap[deleteType] });
|
||||
for await (const { id, ...data } of deletes) {
|
||||
send(response, { type: deleteType, ids: [id], data });
|
||||
}
|
||||
|
||||
const backfillType = SyncEntityType.AlbumToAssetBackfillV1;
|
||||
const backfillCheckpoint = checkpointMap[backfillType];
|
||||
const albums = await this.syncRepository.album.getCreatedAfter(options, backfillCheckpoint?.updateId);
|
||||
const albums = await this.syncRepository.album.getCreatedAfter({
|
||||
...options,
|
||||
afterCreateId: backfillCheckpoint?.updateId,
|
||||
});
|
||||
const upsertType = SyncEntityType.AlbumToAssetV1;
|
||||
const upsertCheckpoint = checkpointMap[upsertType];
|
||||
if (upsertCheckpoint) {
|
||||
@@ -550,7 +589,10 @@ export class SyncService extends BaseService {
|
||||
}
|
||||
|
||||
const startId = getStartId(createId, backfillCheckpoint);
|
||||
const backfill = this.syncRepository.albumToAsset.getBackfill(options, album.id, startId, endId);
|
||||
const backfill = this.syncRepository.albumToAsset.getBackfill(
|
||||
{ ...options, afterUpdateId: startId, beforeUpdateId: endId },
|
||||
album.id,
|
||||
);
|
||||
|
||||
for await (const { updateId, ...data } of backfill) {
|
||||
send(response, { type: backfillType, ids: [createId, updateId], data });
|
||||
@@ -566,7 +608,7 @@ export class SyncService extends BaseService {
|
||||
});
|
||||
}
|
||||
|
||||
const upserts = this.syncRepository.albumToAsset.getUpserts(options, checkpointMap[upsertType]);
|
||||
const upserts = this.syncRepository.albumToAsset.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data });
|
||||
}
|
||||
@@ -574,13 +616,13 @@ export class SyncService extends BaseService {
|
||||
|
||||
private async syncMemoriesV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
|
||||
const deleteType = SyncEntityType.MemoryDeleteV1;
|
||||
const deletes = this.syncRepository.memory.getDeletes(options, checkpointMap[deleteType]);
|
||||
const deletes = this.syncRepository.memory.getDeletes({ ...options, ack: checkpointMap[deleteType] });
|
||||
for await (const { id, ...data } of deletes) {
|
||||
send(response, { type: deleteType, ids: [id], data });
|
||||
}
|
||||
|
||||
const upsertType = SyncEntityType.MemoryV1;
|
||||
const upserts = this.syncRepository.memory.getUpserts(options, checkpointMap[upsertType]);
|
||||
const upserts = this.syncRepository.memory.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data });
|
||||
}
|
||||
@@ -588,13 +630,13 @@ export class SyncService extends BaseService {
|
||||
|
||||
private async syncMemoryAssetsV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
|
||||
const deleteType = SyncEntityType.MemoryToAssetDeleteV1;
|
||||
const deletes = this.syncRepository.memoryToAsset.getDeletes(options, checkpointMap[deleteType]);
|
||||
const deletes = this.syncRepository.memoryToAsset.getDeletes({ ...options, ack: checkpointMap[deleteType] });
|
||||
for await (const { id, ...data } of deletes) {
|
||||
send(response, { type: deleteType, ids: [id], data });
|
||||
}
|
||||
|
||||
const upsertType = SyncEntityType.MemoryToAssetV1;
|
||||
const upserts = this.syncRepository.memoryToAsset.getUpserts(options, checkpointMap[upsertType]);
|
||||
const upserts = this.syncRepository.memoryToAsset.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data });
|
||||
}
|
||||
@@ -602,13 +644,13 @@ export class SyncService extends BaseService {
|
||||
|
||||
private async syncStackV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
|
||||
const deleteType = SyncEntityType.StackDeleteV1;
|
||||
const deletes = this.syncRepository.stack.getDeletes(options, checkpointMap[deleteType]);
|
||||
const deletes = this.syncRepository.stack.getDeletes({ ...options, ack: checkpointMap[deleteType] });
|
||||
for await (const { id, ...data } of deletes) {
|
||||
send(response, { type: deleteType, ids: [id], data });
|
||||
}
|
||||
|
||||
const upsertType = SyncEntityType.StackV1;
|
||||
const upserts = this.syncRepository.stack.getUpserts(options, checkpointMap[upsertType]);
|
||||
const upserts = this.syncRepository.stack.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data });
|
||||
}
|
||||
@@ -621,14 +663,17 @@ export class SyncService extends BaseService {
|
||||
sessionId: string,
|
||||
) {
|
||||
const deleteType = SyncEntityType.PartnerStackDeleteV1;
|
||||
const deletes = this.syncRepository.partnerStack.getDeletes(options, checkpointMap[deleteType]);
|
||||
const deletes = this.syncRepository.partnerStack.getDeletes({ ...options, ack: checkpointMap[deleteType] });
|
||||
for await (const { id, ...data } of deletes) {
|
||||
send(response, { type: deleteType, ids: [id], data });
|
||||
}
|
||||
|
||||
const backfillType = SyncEntityType.PartnerStackBackfillV1;
|
||||
const backfillCheckpoint = checkpointMap[backfillType];
|
||||
const partners = await this.syncRepository.partner.getCreatedAfter(options, backfillCheckpoint?.updateId);
|
||||
const partners = await this.syncRepository.partner.getCreatedAfter({
|
||||
...options,
|
||||
afterCreateId: backfillCheckpoint?.updateId,
|
||||
});
|
||||
const upsertType = SyncEntityType.PartnerStackV1;
|
||||
const upsertCheckpoint = checkpointMap[upsertType];
|
||||
if (upsertCheckpoint) {
|
||||
@@ -641,7 +686,10 @@ export class SyncService extends BaseService {
|
||||
}
|
||||
|
||||
const startId = getStartId(createId, backfillCheckpoint);
|
||||
const backfill = this.syncRepository.partnerStack.getBackfill(options, partner.sharedById, startId, endId);
|
||||
const backfill = this.syncRepository.partnerStack.getBackfill(
|
||||
{ ...options, afterUpdateId: startId, beforeUpdateId: endId },
|
||||
partner.sharedById,
|
||||
);
|
||||
|
||||
for await (const { updateId, ...data } of backfill) {
|
||||
send(response, {
|
||||
@@ -661,7 +709,7 @@ export class SyncService extends BaseService {
|
||||
});
|
||||
}
|
||||
|
||||
const upserts = this.syncRepository.partnerStack.getUpserts(options, checkpointMap[upsertType]);
|
||||
const upserts = this.syncRepository.partnerStack.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data });
|
||||
}
|
||||
@@ -669,13 +717,13 @@ export class SyncService extends BaseService {
|
||||
|
||||
private async syncPeopleV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
|
||||
const deleteType = SyncEntityType.PersonDeleteV1;
|
||||
const deletes = this.syncRepository.people.getDeletes(options, checkpointMap[deleteType]);
|
||||
const deletes = this.syncRepository.people.getDeletes({ ...options, ack: checkpointMap[deleteType] });
|
||||
for await (const { id, ...data } of deletes) {
|
||||
send(response, { type: deleteType, ids: [id], data });
|
||||
}
|
||||
|
||||
const upsertType = SyncEntityType.PersonV1;
|
||||
const upserts = this.syncRepository.people.getUpserts(options, checkpointMap[upsertType]);
|
||||
const upserts = this.syncRepository.people.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data });
|
||||
}
|
||||
@@ -683,13 +731,13 @@ export class SyncService extends BaseService {
|
||||
|
||||
private async syncAssetFacesV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
|
||||
const deleteType = SyncEntityType.AssetFaceDeleteV1;
|
||||
const deletes = this.syncRepository.assetFace.getDeletes(options, checkpointMap[deleteType]);
|
||||
const deletes = this.syncRepository.assetFace.getDeletes({ ...options, ack: checkpointMap[deleteType] });
|
||||
for await (const { id, ...data } of deletes) {
|
||||
send(response, { type: deleteType, ids: [id], data });
|
||||
}
|
||||
|
||||
const upsertType = SyncEntityType.AssetFaceV1;
|
||||
const upserts = this.syncRepository.assetFace.getUpserts(options, checkpointMap[upsertType]);
|
||||
const upserts = this.syncRepository.assetFace.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data });
|
||||
}
|
||||
@@ -697,14 +745,14 @@ export class SyncService extends BaseService {
|
||||
|
||||
private async syncUserMetadataV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
|
||||
const deleteType = SyncEntityType.UserMetadataDeleteV1;
|
||||
const deletes = this.syncRepository.userMetadata.getDeletes(options, checkpointMap[deleteType]);
|
||||
const deletes = this.syncRepository.userMetadata.getDeletes({ ...options, ack: checkpointMap[deleteType] });
|
||||
|
||||
for await (const { id, ...data } of deletes) {
|
||||
send(response, { type: deleteType, ids: [id], data });
|
||||
}
|
||||
|
||||
const upsertType = SyncEntityType.UserMetadataV1;
|
||||
const upserts = this.syncRepository.userMetadata.getUpserts(options, checkpointMap[upsertType]);
|
||||
const upserts = this.syncRepository.userMetadata.getUpserts({ ...options, ack: checkpointMap[upsertType] });
|
||||
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data });
|
||||
|
||||
Reference in New Issue
Block a user