import { Injectable } from '@nestjs/common'; import { Insertable, Kysely, SelectQueryBuilder, sql } from 'kysely'; import { InjectKysely } from 'nestjs-kysely'; import { columns } from 'src/database'; import { DB, SessionSyncCheckpoints } from 'src/db'; import { DummyValue, GenerateSql } from 'src/decorators'; import { SyncEntityType } from 'src/enum'; import { SyncAck } from 'src/types'; type auditTables = 'users_audit' | 'partners_audit' | 'assets_audit'; type upsertTables = 'users' | 'partners' | 'assets' | 'exif'; @Injectable() export class SyncRepository { constructor(@InjectKysely() private db: Kysely) {} @GenerateSql({ params: [DummyValue.UUID] }) getCheckpoints(sessionId: string) { return this.db .selectFrom('session_sync_checkpoints') .select(['type', 'ack']) .where('sessionId', '=', sessionId) .execute(); } upsertCheckpoints(items: Insertable[]) { return this.db .insertInto('session_sync_checkpoints') .values(items) .onConflict((oc) => oc.columns(['sessionId', 'type']).doUpdateSet((eb) => ({ ack: eb.ref('excluded.ack'), })), ) .execute(); } @GenerateSql({ params: [DummyValue.UUID] }) deleteCheckpoints(sessionId: string, types?: SyncEntityType[]) { return this.db .deleteFrom('session_sync_checkpoints') .where('sessionId', '=', sessionId) .$if(!!types, (qb) => qb.where('type', 'in', types!)) .execute(); } @GenerateSql({ params: [], stream: true }) getUserUpserts(ack?: SyncAck) { return this.db .selectFrom('users') .select(['id', 'name', 'email', 'deletedAt', 'updateId']) .$call((qb) => this.upsertTableFilters(qb, ack)) .stream(); } @GenerateSql({ params: [], stream: true }) getUserDeletes(ack?: SyncAck) { return this.db .selectFrom('users_audit') .select(['id', 'userId']) .$call((qb) => this.auditTableFilters(qb, ack)) .stream(); } @GenerateSql({ params: [DummyValue.UUID], stream: true }) getPartnerUpserts(userId: string, ack?: SyncAck) { return this.db .selectFrom('partners') .select(['sharedById', 'sharedWithId', 'inTimeline', 'updateId']) .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) .$call((qb) => this.upsertTableFilters(qb, ack)) .stream(); } @GenerateSql({ params: [DummyValue.UUID], stream: true }) getPartnerDeletes(userId: string, ack?: SyncAck) { return this.db .selectFrom('partners_audit') .select(['id', 'sharedById', 'sharedWithId']) .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) .$call((qb) => this.auditTableFilters(qb, ack)) .stream(); } @GenerateSql({ params: [DummyValue.UUID], stream: true }) getAssetUpserts(userId: string, ack?: SyncAck) { return this.db .selectFrom('assets') .select(columns.syncAsset) .where('ownerId', '=', userId) .$call((qb) => this.upsertTableFilters(qb, ack)) .stream(); } @GenerateSql({ params: [DummyValue.UUID], stream: true }) getPartnerAssetsUpserts(userId: string, ack?: SyncAck) { return this.db .selectFrom('assets') .select(columns.syncAsset) .where('ownerId', 'in', (eb) => eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), ) .$call((qb) => this.upsertTableFilters(qb, ack)) .stream(); } @GenerateSql({ params: [DummyValue.UUID], stream: true }) getAssetDeletes(userId: string, ack?: SyncAck) { return this.db .selectFrom('assets_audit') .select(['id', 'assetId']) .where('ownerId', '=', userId) .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) .$call((qb) => this.auditTableFilters(qb, ack)) .stream(); } @GenerateSql({ params: [DummyValue.UUID], stream: true }) getPartnerAssetDeletes(userId: string, ack?: SyncAck) { return this.db .selectFrom('assets_audit') .select(['id', 'assetId']) .where('ownerId', 'in', (eb) => eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), ) .$call((qb) => this.auditTableFilters(qb, ack)) .stream(); } @GenerateSql({ params: [DummyValue.UUID], stream: true }) getAssetExifsUpserts(userId: string, ack?: SyncAck) { return this.db .selectFrom('exif') .select(columns.syncAssetExif) .where('assetId', 'in', (eb) => eb.selectFrom('assets').select('id').where('ownerId', '=', userId)) .$call((qb) => this.upsertTableFilters(qb, ack)) .stream(); } @GenerateSql({ params: [DummyValue.UUID], stream: true }) getPartnerAssetExifsUpserts(userId: string, ack?: SyncAck) { return this.db .selectFrom('exif') .select(columns.syncAssetExif) .where('assetId', 'in', (eb) => eb .selectFrom('assets') .select('id') .where('ownerId', 'in', (eb) => eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), ), ) .$call((qb) => this.upsertTableFilters(qb, ack)) .stream(); } private auditTableFilters, D>(qb: SelectQueryBuilder, ack?: SyncAck) { const builder = qb as SelectQueryBuilder; return builder .where('deletedAt', '<', sql.raw("now() - interval '1 millisecond'")) .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) .orderBy('id', 'asc') as SelectQueryBuilder; } private upsertTableFilters, D>( qb: SelectQueryBuilder, ack?: SyncAck, ) { const builder = qb as SelectQueryBuilder; return builder .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) .orderBy('updateId', 'asc') as SelectQueryBuilder; } }