feat: ack sync reset (#20703)

This commit is contained in:
Zack Pollard
2025-08-05 21:30:19 +01:00
committed by GitHub
parent 9567a2a560
commit 0a9cbf01d2
9 changed files with 67 additions and 20 deletions
+1 -2
View File
@@ -202,7 +202,6 @@ export type Album = Selectable<AlbumTable> & {
export type AuthSession = { export type AuthSession = {
id: string; id: string;
isPendingSyncReset: boolean;
hasElevatedPermission: boolean; hasElevatedPermission: boolean;
}; };
@@ -309,7 +308,7 @@ export const columns = {
assetFiles: ['asset_file.id', 'asset_file.path', 'asset_file.type'], assetFiles: ['asset_file.id', 'asset_file.path', 'asset_file.type'],
authUser: ['user.id', 'user.name', 'user.email', 'user.isAdmin', 'user.quotaUsageInBytes', 'user.quotaSizeInBytes'], authUser: ['user.id', 'user.name', 'user.email', 'user.isAdmin', 'user.quotaUsageInBytes', 'user.quotaSizeInBytes'],
authApiKey: ['api_key.id', 'api_key.permissions'], authApiKey: ['api_key.id', 'api_key.permissions'],
authSession: ['session.id', 'session.isPendingSyncReset', 'session.updatedAt', 'session.pinExpiresAt'], authSession: ['session.id', 'session.updatedAt', 'session.pinExpiresAt'],
authSharedLink: [ authSharedLink: [
'shared_link.id', 'shared_link.id',
'shared_link.userId', 'shared_link.userId',
+3 -2
View File
@@ -1,4 +1,4 @@
import { IsInt, IsPositive, IsString } from 'class-validator'; import { Equals, IsInt, IsPositive, IsString } from 'class-validator';
import { Session } from 'src/database'; import { Session } from 'src/database';
import { Optional, ValidateBoolean } from 'src/validation'; import { Optional, ValidateBoolean } from 'src/validation';
@@ -22,7 +22,8 @@ export class SessionCreateDto {
export class SessionUpdateDto { export class SessionUpdateDto {
@ValidateBoolean({ optional: true }) @ValidateBoolean({ optional: true })
isPendingSyncReset?: boolean; @Equals(true)
isPendingSyncReset?: true;
} }
export class SessionResponseDto { export class SessionResponseDto {
+8 -1
View File
@@ -10,10 +10,17 @@ from
where where
"id" = $1 "id" = $1
-- SessionRepository.isPendingSyncReset
select
"isPendingSyncReset"
from
"session"
where
"id" = $1
-- SessionRepository.getByToken -- SessionRepository.getByToken
select select
"session"."id", "session"."id",
"session"."isPendingSyncReset",
"session"."updatedAt", "session"."updatedAt",
"session"."pinExpiresAt", "session"."pinExpiresAt",
( (
@@ -37,6 +37,16 @@ export class SessionRepository {
.executeTakeFirst(); .executeTakeFirst();
} }
@GenerateSql({ params: [DummyValue.UUID] })
async isPendingSyncReset(id: string) {
const result = await this.db
.selectFrom('session')
.select(['isPendingSyncReset'])
.where('id', '=', id)
.executeTakeFirst();
return result?.isPendingSyncReset ?? false;
}
@GenerateSql({ params: [DummyValue.STRING] }) @GenerateSql({ params: [DummyValue.STRING] })
getByToken(token: string) { getByToken(token: string) {
return this.db return this.db
-4
View File
@@ -241,7 +241,6 @@ describe(AuthService.name, () => {
const sessionWithToken = { const sessionWithToken = {
id: session.id, id: session.id,
updatedAt: session.updatedAt, updatedAt: session.updatedAt,
isPendingSyncReset: false,
user: factory.authUser(), user: factory.authUser(),
pinExpiresAt: null, pinExpiresAt: null,
}; };
@@ -259,7 +258,6 @@ describe(AuthService.name, () => {
session: { session: {
id: session.id, id: session.id,
hasElevatedPermission: false, hasElevatedPermission: false,
isPendingSyncReset: session.isPendingSyncReset,
}, },
}); });
}); });
@@ -409,7 +407,6 @@ describe(AuthService.name, () => {
id: session.id, id: session.id,
updatedAt: session.updatedAt, updatedAt: session.updatedAt,
user: factory.authUser(), user: factory.authUser(),
isPendingSyncReset: false,
pinExpiresAt: null, pinExpiresAt: null,
}; };
@@ -426,7 +423,6 @@ describe(AuthService.name, () => {
session: { session: {
id: session.id, id: session.id,
hasElevatedPermission: false, hasElevatedPermission: false,
isPendingSyncReset: session.isPendingSyncReset,
}, },
}); });
}); });
-1
View File
@@ -487,7 +487,6 @@ export class AuthService extends BaseService {
user: session.user, user: session.user,
session: { session: {
id: session.id, id: session.id,
isPendingSyncReset: session.isPendingSyncReset,
hasElevatedPermission, hasElevatedPermission,
}, },
}; };
+9 -4
View File
@@ -23,7 +23,7 @@ import { SyncAck } from 'src/types';
import { getMyPartnerIds } from 'src/utils/asset.util'; import { getMyPartnerIds } from 'src/utils/asset.util';
import { hexOrBufferToBase64 } from 'src/utils/bytes'; import { hexOrBufferToBase64 } from 'src/utils/bytes';
import { setIsEqual } from 'src/utils/set'; import { setIsEqual } from 'src/utils/set';
import { fromAck, mapJsonLine, serialize, SerializeOptions, toAck } from 'src/utils/sync'; import { fromAck, serialize, SerializeOptions, toAck } from 'src/utils/sync';
type CheckpointMap = Partial<Record<SyncEntityType, SyncAck>>; type CheckpointMap = Partial<Record<SyncEntityType, SyncAck>>;
type AssetLike = Omit<SyncAssetV1, 'checksum' | 'thumbhash'> & { type AssetLike = Omit<SyncAssetV1, 'checksum' | 'thumbhash'> & {
@@ -100,6 +100,10 @@ export class SyncService extends BaseService {
const checkpoints: Record<string, Insertable<SessionSyncCheckpointTable>> = {}; const checkpoints: Record<string, Insertable<SessionSyncCheckpointTable>> = {};
for (const ack of dto.acks) { for (const ack of dto.acks) {
const { type } = fromAck(ack); const { type } = fromAck(ack);
if (type === SyncEntityType.SyncResetV1) {
await this.sessionRepository.resetSyncProgress(sessionId);
return;
}
// TODO proper ack validation via class validator // TODO proper ack validation via class validator
if (!Object.values(SyncEntityType).includes(type)) { if (!Object.values(SyncEntityType).includes(type)) {
throw new BadRequestException(`Invalid ack type: ${type}`); throw new BadRequestException(`Invalid ack type: ${type}`);
@@ -129,11 +133,12 @@ export class SyncService extends BaseService {
if (dto.reset) { if (dto.reset) {
await this.sessionRepository.resetSyncProgress(session.id); await this.sessionRepository.resetSyncProgress(session.id);
session.isPendingSyncReset = false;
} }
if (session.isPendingSyncReset) { const isPendingSyncReset = await this.sessionRepository.isPendingSyncReset(session.id);
response.write(mapJsonLine({ type: SyncEntityType.SyncResetV1, data: {} }));
if (isPendingSyncReset) {
send(response, { type: SyncEntityType.SyncResetV1, ids: ['reset'], data: {} });
response.end(); response.end();
return; return;
} }
@@ -1,5 +1,6 @@
import { Kysely } from 'kysely'; import { Kysely } from 'kysely';
import { SyncEntityType, SyncRequestType } from 'src/enum'; import { SyncEntityType, SyncRequestType } from 'src/enum';
import { SessionRepository } from 'src/repositories/session.repository';
import { DB } from 'src/schema'; import { DB } from 'src/schema';
import { SyncTestContext } from 'test/medium.factory'; import { SyncTestContext } from 'test/medium.factory';
import { getKyselyDB } from 'test/utils'; import { getKyselyDB } from 'test/utils';
@@ -27,10 +28,12 @@ describe(SyncEntityType.SyncResetV1, () => {
it('should detect a pending sync reset', async () => { it('should detect a pending sync reset', async () => {
const { auth, ctx } = await setup(); const { auth, ctx } = await setup();
auth.session!.isPendingSyncReset = true; await ctx.get(SessionRepository).update(auth.session!.id, {
isPendingSyncReset: true,
});
const response = await ctx.syncStream(auth, [SyncRequestType.AssetsV1]); const response = await ctx.syncStream(auth, [SyncRequestType.AssetsV1]);
expect(response).toEqual([{ type: SyncEntityType.SyncResetV1, data: {} }]); expect(response).toEqual([{ type: SyncEntityType.SyncResetV1, data: {}, ack: 'SyncResetV1|reset' }]);
}); });
it('should not send other dtos when a reset is pending', async () => { it('should not send other dtos when a reset is pending', async () => {
@@ -40,10 +43,12 @@ describe(SyncEntityType.SyncResetV1, () => {
await expect(ctx.syncStream(auth, [SyncRequestType.AssetsV1])).resolves.toHaveLength(1); await expect(ctx.syncStream(auth, [SyncRequestType.AssetsV1])).resolves.toHaveLength(1);
auth.session!.isPendingSyncReset = true; await ctx.get(SessionRepository).update(auth.session!.id, {
isPendingSyncReset: true,
});
await expect(ctx.syncStream(auth, [SyncRequestType.AssetsV1])).resolves.toEqual([ await expect(ctx.syncStream(auth, [SyncRequestType.AssetsV1])).resolves.toEqual([
{ type: SyncEntityType.SyncResetV1, data: {} }, { type: SyncEntityType.SyncResetV1, data: {}, ack: 'SyncResetV1|reset' },
]); ]);
}); });
@@ -52,7 +57,9 @@ describe(SyncEntityType.SyncResetV1, () => {
await ctx.newAsset({ ownerId: user.id }); await ctx.newAsset({ ownerId: user.id });
auth.session!.isPendingSyncReset = true; await ctx.get(SessionRepository).update(auth.session!.id, {
isPendingSyncReset: true,
});
await expect(ctx.syncStream(auth, [SyncRequestType.AssetsV1], true)).resolves.toEqual([ await expect(ctx.syncStream(auth, [SyncRequestType.AssetsV1], true)).resolves.toEqual([
expect.objectContaining({ expect.objectContaining({
@@ -60,4 +67,28 @@ describe(SyncEntityType.SyncResetV1, () => {
}), }),
]); ]);
}); });
it('should reset the sync progress', async () => {
const { auth, user, ctx } = await setup();
await ctx.newAsset({ ownerId: user.id });
const response = await ctx.syncStream(auth, [SyncRequestType.AssetsV1]);
await ctx.syncAckAll(auth, response);
await ctx.get(SessionRepository).update(auth.session!.id, {
isPendingSyncReset: true,
});
const resetResponse = await ctx.syncStream(auth, [SyncRequestType.AssetsV1]);
await ctx.syncAckAll(auth, resetResponse);
const postResetResponse = await ctx.syncStream(auth, [SyncRequestType.AssetsV1]);
expect(postResetResponse).toEqual([
expect.objectContaining({
type: SyncEntityType.AssetV1,
}),
]);
});
}); });
-1
View File
@@ -60,7 +60,6 @@ const authFactory = ({
if (session) { if (session) {
auth.session = { auth.session = {
id: session.id, id: session.id,
isPendingSyncReset: false,
hasElevatedPermission: false, hasElevatedPermission: false,
}; };
} }