merge main

This commit is contained in:
shenlong-tanwen
2025-09-17 16:08:34 +05:30
528 changed files with 17365 additions and 6390 deletions
+16 -5
View File
@@ -1,21 +1,20 @@
import 'package:immich_mobile/domain/models/album/local_album.model.dart';
import 'package:immich_mobile/domain/models/asset/base_asset.model.dart';
import 'package:immich_mobile/domain/models/exif.model.dart';
import 'package:immich_mobile/extensions/platform_extensions.dart';
import 'package:immich_mobile/infrastructure/repositories/local_asset.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/remote_asset.repository.dart';
import 'package:immich_mobile/infrastructure/utils/exif.converter.dart';
import 'package:platform/platform.dart';
class AssetService {
final RemoteAssetRepository _remoteAssetRepository;
final DriftLocalAssetRepository _localAssetRepository;
final Platform _platform;
const AssetService({
required RemoteAssetRepository remoteAssetRepository,
required DriftLocalAssetRepository localAssetRepository,
}) : _remoteAssetRepository = remoteAssetRepository,
_localAssetRepository = localAssetRepository,
_platform = const LocalPlatform();
_localAssetRepository = localAssetRepository;
Future<BaseAsset?> getAsset(BaseAsset asset) {
final id = asset is LocalAsset ? asset.id : (asset as RemoteAsset).id;
@@ -27,6 +26,14 @@ class AssetService {
return asset is LocalAsset ? _localAssetRepository.watch(id) : _remoteAssetRepository.watch(id);
}
Future<List<LocalAsset?>> getLocalAssetsByChecksum(String checksum) {
return _localAssetRepository.getByChecksum(checksum);
}
Future<RemoteAsset?> getRemoteAssetByChecksum(String checksum) {
return _remoteAssetRepository.getByChecksum(checksum);
}
Future<RemoteAsset?> getRemoteAsset(String id) {
return _remoteAssetRepository.get(id);
}
@@ -62,7 +69,7 @@ class AssetService {
width = exif?.width ?? asset.width?.toDouble();
height = exif?.height ?? asset.height?.toDouble();
} else if (asset is LocalAsset) {
isFlipped = _platform.isAndroid && (asset.orientation == 90 || asset.orientation == 270);
isFlipped = CurrentPlatform.isAndroid && (asset.orientation == 90 || asset.orientation == 270);
width = asset.width?.toDouble();
height = asset.height?.toDouble();
} else {
@@ -89,4 +96,8 @@ class AssetService {
Future<int> getLocalHashedCount() {
return _localAssetRepository.getHashedCount();
}
Future<List<LocalAlbum>> getSourceAlbums(String localAssetId, {BackupSelection? backupSelection}) {
return _localAssetRepository.getSourceAlbums(localAssetId, backupSelection: backupSelection);
}
}
@@ -1,11 +1,17 @@
import 'dart:async';
import 'dart:io';
import 'dart:ui';
import 'package:background_downloader/background_downloader.dart';
import 'package:cancellation_token_http/http.dart';
import 'package:flutter/material.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/constants/constants.dart';
import 'package:immich_mobile/domain/utils/isolate_lock_manager.dart';
import 'package:immich_mobile/domain/services/log.service.dart';
import 'package:immich_mobile/entities/store.entity.dart';
import 'package:immich_mobile/extensions/network_capability_extensions.dart';
import 'package:immich_mobile/extensions/translate_extensions.dart';
import 'package:immich_mobile/generated/intl_keys.g.dart';
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/logger_db.repository.dart';
import 'package:immich_mobile/platform/background_worker_api.g.dart';
@@ -14,16 +20,20 @@ import 'package:immich_mobile/providers/background_sync.provider.dart';
import 'package:immich_mobile/providers/backup/drift_backup.provider.dart';
import 'package:immich_mobile/providers/db.provider.dart';
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
import 'package:immich_mobile/providers/infrastructure/platform.provider.dart';
import 'package:immich_mobile/providers/user.provider.dart';
import 'package:immich_mobile/repositories/file_media.repository.dart';
import 'package:immich_mobile/services/app_settings.service.dart';
import 'package:immich_mobile/services/auth.service.dart';
import 'package:immich_mobile/services/localization.service.dart';
import 'package:immich_mobile/services/server_info.service.dart';
import 'package:immich_mobile/services/upload.service.dart';
import 'package:immich_mobile/utils/bootstrap.dart';
import 'package:immich_mobile/utils/debug_print.dart';
import 'package:immich_mobile/utils/http_ssl_options.dart';
import 'package:isar/isar.dart';
import 'package:logging/logging.dart';
import 'package:worker_manager/worker_manager.dart';
class BackgroundWorkerFgService {
final BackgroundWorkerFgHostApi _foregroundHostApi;
@@ -42,8 +52,8 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
final Drift _drift;
final DriftLogger _driftLogger;
final BackgroundWorkerBgHostApi _backgroundHostApi;
final Logger _logger = Logger('BackgroundUploadBgService');
late final IsolateLockManager _lockManager;
final CancellationToken _cancellationToken = CancellationToken();
final Logger _logger = Logger('BackgroundWorkerBgService');
bool _isCleanedUp = false;
@@ -59,7 +69,6 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
driftProvider.overrideWith(driftOverride(drift)),
],
);
_lockManager = IsolateLockManager(onCloseRequest: _cleanup);
BackgroundWorkerFlutterApi.setUp(this);
}
@@ -67,41 +76,37 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
Future<void> init() async {
try {
await loadTranslations();
HttpSSLOptions.apply(applyNative: false);
await _ref.read(authServiceProvider).setOpenApiServiceEndpoint();
// Initialize the file downloader
await FileDownloader().configure(
globalConfig: [
// maxConcurrent: 6, maxConcurrentByHost(server):6, maxConcurrentByGroup: 3
(Config.holdingQueue, (6, 6, 3)),
// On Android, if files are larger than 256MB, run in foreground service
(Config.runInForegroundIfFileLargerThan, 256),
],
);
await FileDownloader().trackTasksInGroup(kDownloadGroupLivePhoto, markDownloadedComplete: false);
await FileDownloader().trackTasks();
await Future.wait([
loadTranslations(),
workerManager.init(dynamicSpawning: true),
_ref.read(authServiceProvider).setOpenApiServiceEndpoint(),
// Initialize the file downloader
FileDownloader().configure(
globalConfig: [
// maxConcurrent: 6, maxConcurrentByHost(server):6, maxConcurrentByGroup: 3
(Config.holdingQueue, (6, 6, 3)),
// On Android, if files are larger than 256MB, run in foreground service
(Config.runInForegroundIfFileLargerThan, 256),
],
),
FileDownloader().trackTasksInGroup(kDownloadGroupLivePhoto, markDownloadedComplete: false),
FileDownloader().trackTasks(),
_ref.read(fileMediaRepositoryProvider).enableBackgroundAccess(),
]);
configureFileDownloaderNotifications();
await _ref.read(fileMediaRepositoryProvider).enableBackgroundAccess();
// Notify the host that the background upload service has been initialized and is ready to use
debugPrint("Acquiring background worker lock");
if (await _lockManager.acquireLock().timeout(
const Duration(seconds: 5),
onTimeout: () {
_lockManager.cancel();
return false;
},
)) {
_logger.info("Acquired background worker lock");
await _backgroundHostApi.onInitialized();
return;
if (Platform.isAndroid) {
await _backgroundHostApi.showNotification(
IntlKeys.uploading_media.t(),
IntlKeys.backup_background_service_in_progress_notification.t(),
);
}
_logger.warning("Failed to acquire background worker lock");
await _cleanup();
await _backgroundHostApi.close();
// Notify the host that the background worker service has been initialized and is ready to use
_backgroundHostApi.onInitialized();
} catch (error, stack) {
_logger.severe("Failed to initialize background worker", error, stack);
await _backgroundHostApi.close();
@@ -115,7 +120,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
final sw = Stopwatch()..start();
await _syncAssets(hashTimeout: Duration(minutes: _isBackupEnabled ? 3 : 6));
await _handleBackup(processBulk: false);
await _handleBackup();
sw.stop();
_logger.info("Android background processing completed in ${sw.elapsed.inSeconds}s");
@@ -157,7 +162,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
try {
await _cleanup();
} catch (error, stack) {
debugPrint('Failed to cleanup background worker: $error with stack: $stack');
dPrint(() => 'Failed to cleanup background worker: $error with stack: $stack');
}
}
@@ -167,70 +172,96 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
}
try {
final backgroundSyncManager = _ref.read(backgroundSyncProvider);
_isCleanedUp = true;
_logger.info("Cleaning up background worker");
await _ref.read(backgroundSyncProvider).cancel();
await _ref.read(backgroundSyncProvider).cancelLocal();
if (_isar.isOpen) {
await _isar.close();
}
await _drift.close();
await _driftLogger.close();
_ref.dispose();
_lockManager.releaseLock();
_cancellationToken.cancel();
_logger.info("Cleaning up background worker");
final cleanupFutures = [
workerManager.dispose().catchError((_) async {
// Discard any errors on the dispose call
return;
}),
LogService.I.dispose(),
Store.dispose(),
_drift.close(),
_driftLogger.close(),
backgroundSyncManager.cancel(),
backgroundSyncManager.cancelLocal(),
];
if (_isar.isOpen) {
cleanupFutures.add(_isar.close());
}
await Future.wait(cleanupFutures);
_logger.info("Background worker resources cleaned up");
} catch (error, stack) {
debugPrint('Failed to cleanup background worker: $error with stack: $stack');
dPrint(() => 'Failed to cleanup background worker: $error with stack: $stack');
}
}
Future<void> _handleBackup({bool processBulk = true}) async {
if (!_isBackupEnabled) {
return;
}
Future<void> _handleBackup() async {
await runZonedGuarded(
() async {
if (!_isBackupEnabled || _isCleanedUp) {
_logger.info("[_handleBackup 1] Backup is disabled. Skipping backup routine");
return;
}
final currentUser = _ref.read(currentUserProvider);
if (currentUser == null) {
return;
}
_logger.info("[_handleBackup 2] Enqueuing assets for backup from the background service");
if (processBulk) {
return _ref.read(driftBackupProvider.notifier).handleBackupResume(currentUser.id);
}
final currentUser = _ref.read(currentUserProvider);
if (currentUser == null) {
_logger.warning("[_handleBackup 3] No current user found. Skipping backup from background");
return;
}
final activeTask = await _ref.read(uploadServiceProvider).getActiveTasks(currentUser.id);
if (activeTask.isNotEmpty) {
await _ref.read(uploadServiceProvider).resumeBackup();
} else {
await _ref.read(uploadServiceProvider).startBackupSerial(currentUser.id);
}
_logger.info("[_handleBackup 4] Resume backup from background");
if (Platform.isIOS) {
return _ref.read(driftBackupProvider.notifier).handleBackupResume(currentUser.id);
}
final canPing = await _ref.read(serverInfoServiceProvider).ping();
if (!canPing) {
_logger.warning("[_handleBackup 5] Server is not reachable. Skipping backup from background");
return;
}
final networkCapabilities = await _ref.read(connectivityApiProvider).getCapabilities();
return _ref
.read(uploadServiceProvider)
.startBackupWithHttpClient(currentUser.id, networkCapabilities.hasWifi, _cancellationToken);
},
(error, stack) {
dPrint(() => "Error in backup zone $error, $stack");
},
);
}
Future<void> _syncAssets({Duration? hashTimeout}) async {
final futures = <Future<void>>[];
await _ref.read(backgroundSyncProvider).syncLocal();
if (_isCleanedUp) {
return;
}
final localSyncFuture = _ref.read(backgroundSyncProvider).syncLocal().then((_) async {
if (_isCleanedUp) {
return;
}
await _ref.read(backgroundSyncProvider).syncRemote();
if (_isCleanedUp) {
return;
}
var hashFuture = _ref.read(backgroundSyncProvider).hashAssets();
if (hashTimeout != null) {
hashFuture = hashFuture.timeout(
hashTimeout,
onTimeout: () {
// Consume cancellation errors as we want to continue processing
},
);
}
var hashFuture = _ref.read(backgroundSyncProvider).hashAssets();
if (hashTimeout != null) {
hashFuture = hashFuture.timeout(
hashTimeout,
onTimeout: () {
// Consume cancellation errors as we want to continue processing
},
);
}
return hashFuture;
});
futures.add(localSyncFuture);
futures.add(_ref.read(backgroundSyncProvider).syncRemote());
await Future.wait(futures);
await hashFuture;
}
}
@@ -242,6 +273,6 @@ Future<void> backgroundSyncNativeEntrypoint() async {
DartPluginRegistrant.ensureInitialized();
final (isar, drift, logDB) = await Bootstrap.initDB();
await Bootstrap.initDomain(isar, drift, logDB, shouldBufferLogs: false);
await Bootstrap.initDomain(isar, drift, logDB, shouldBufferLogs: false, listenStoreUpdates: false);
await BackgroundWorkerBgService(isar: isar, drift: drift, driftLogger: logDB).init();
}
@@ -1,29 +1,24 @@
import 'dart:async';
import 'package:collection/collection.dart';
import 'package:flutter/widgets.dart';
import 'package:flutter/foundation.dart';
import 'package:immich_mobile/domain/models/album/local_album.model.dart';
import 'package:immich_mobile/domain/models/asset/base_asset.model.dart';
import 'package:immich_mobile/extensions/platform_extensions.dart';
import 'package:immich_mobile/infrastructure/repositories/local_album.repository.dart';
import 'package:immich_mobile/platform/native_sync_api.g.dart';
import 'package:immich_mobile/utils/datetime_helpers.dart';
import 'package:immich_mobile/utils/diff.dart';
import 'package:logging/logging.dart';
import 'package:platform/platform.dart';
class LocalSyncService {
final DriftLocalAlbumRepository _localAlbumRepository;
final NativeSyncApi _nativeSyncApi;
final Platform _platform;
final Logger _log = Logger("DeviceSyncService");
LocalSyncService({
required DriftLocalAlbumRepository localAlbumRepository,
required NativeSyncApi nativeSyncApi,
Platform? platform,
}) : _localAlbumRepository = localAlbumRepository,
_nativeSyncApi = nativeSyncApi,
_platform = platform ?? const LocalPlatform();
LocalSyncService({required DriftLocalAlbumRepository localAlbumRepository, required NativeSyncApi nativeSyncApi})
: _localAlbumRepository = localAlbumRepository,
_nativeSyncApi = nativeSyncApi;
Future<void> sync({bool full = false}) async {
final Stopwatch stopwatch = Stopwatch()..start();
@@ -53,14 +48,14 @@ class LocalSyncService {
final dbAlbums = await _localAlbumRepository.getAll();
// On Android, we need to sync all albums since it is not possible to
// detect album deletions from the native side
if (_platform.isAndroid) {
if (CurrentPlatform.isAndroid) {
for (final album in dbAlbums) {
final deviceIds = await _nativeSyncApi.getAssetIdsForAlbum(album.id);
await _localAlbumRepository.syncDeletes(album.id, deviceIds);
}
}
if (_platform.isIOS) {
if (CurrentPlatform.isIOS) {
// On iOS, we need to full sync albums that are marked as cloud as the delta sync
// does not include changes for cloud albums. If ignoreIcloudAssets is enabled,
// remove the albums from the local database from the previous sync
+7 -8
View File
@@ -1,11 +1,11 @@
import 'dart:async';
import 'package:flutter/foundation.dart';
import 'package:immich_mobile/constants/constants.dart';
import 'package:immich_mobile/domain/models/log.model.dart';
import 'package:immich_mobile/domain/models/store.model.dart';
import 'package:immich_mobile/infrastructure/repositories/log.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/store.repository.dart';
import 'package:immich_mobile/utils/debug_print.dart';
import 'package:logging/logging.dart';
/// Service responsible for handling application logging.
@@ -66,13 +66,12 @@ class LogService {
}
void _handleLogRecord(LogRecord r) {
if (kDebugMode) {
debugPrint(
'[${r.level.name}] [${r.time}] [${r.loggerName}] ${r.message}'
'${r.error == null ? '' : '\nError: ${r.error}'}'
'${r.stackTrace == null ? '' : '\nStack: ${r.stackTrace}'}',
);
}
dPrint(
() =>
'[${r.level.name}] [${r.time}] [${r.loggerName}] ${r.message}'
'${r.error == null ? '' : '\nError: ${r.error}'}'
'${r.stackTrace == null ? '' : '\nStack: ${r.stackTrace}'}',
);
final record = LogMessage(
message: r.message,
@@ -1,7 +1,7 @@
import 'package:flutter/foundation.dart';
import 'package:immich_mobile/domain/models/user.model.dart';
import 'package:immich_mobile/infrastructure/repositories/partner.repository.dart';
import 'package:immich_mobile/repositories/partner_api.repository.dart';
import 'package:immich_mobile/utils/debug_print.dart';
class DriftPartnerService {
final DriftPartnerRepository _driftPartnerRepository;
@@ -30,7 +30,7 @@ class DriftPartnerService {
Future<void> toggleShowInTimeline(String partnerId, String userId) async {
final partner = await _driftPartnerRepository.getPartner(partnerId, userId);
if (partner == null) {
debugPrint("Partner not found: $partnerId for user: $userId");
dPrint(() => "Partner not found: $partnerId for user: $userId");
return;
}
+10 -8
View File
@@ -10,7 +10,7 @@ class StoreService {
/// In-memory cache. Keys are [StoreKey.id]
final Map<int, Object?> _cache = {};
late final StreamSubscription<List<StoreDto>> _storeUpdateSubscription;
StreamSubscription<List<StoreDto>>? _storeUpdateSubscription;
StoreService._({required IStoreRepository isarStoreRepository}) : _storeRepository = isarStoreRepository;
@@ -24,15 +24,17 @@ class StoreService {
}
// TODO: Replace the implementation with the one from create after removing the typedef
static Future<StoreService> init({required IStoreRepository storeRepository}) async {
_instance ??= await create(storeRepository: storeRepository);
static Future<StoreService> init({required IStoreRepository storeRepository, bool listenUpdates = true}) async {
_instance ??= await create(storeRepository: storeRepository, listenUpdates: listenUpdates);
return _instance!;
}
static Future<StoreService> create({required IStoreRepository storeRepository}) async {
static Future<StoreService> create({required IStoreRepository storeRepository, bool listenUpdates = true}) async {
final instance = StoreService._(isarStoreRepository: storeRepository);
await instance.populateCache();
instance._storeUpdateSubscription = instance._listenForChange();
if (listenUpdates) {
instance._storeUpdateSubscription = instance._listenForChange();
}
return instance;
}
@@ -50,8 +52,8 @@ class StoreService {
});
/// Disposes the store and cancels the subscription. To reuse the store call init() again
void dispose() async {
await _storeUpdateSubscription.cancel();
Future<void> dispose() async {
await _storeUpdateSubscription?.cancel();
_cache.clear();
}
@@ -90,7 +92,7 @@ class StoreService {
_cache.clear();
}
bool get isBetaTimelineEnabled => tryGet(StoreKey.betaTimeline) ?? false;
bool get isBetaTimelineEnabled => tryGet(StoreKey.betaTimeline) ?? true;
}
class StoreKeyNotFoundException implements Exception {
@@ -1,10 +1,11 @@
import 'package:flutter/material.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/domain/models/album/local_album.model.dart';
import 'package:immich_mobile/infrastructure/repositories/local_album.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/remote_album.repository.dart';
import 'package:immich_mobile/providers/infrastructure/album.provider.dart';
import 'package:immich_mobile/repositories/drift_album_api_repository.dart';
import 'package:logging/logging.dart';
import 'package:immich_mobile/utils/debug_print.dart';
final syncLinkedAlbumServiceProvider = Provider(
(ref) => SyncLinkedAlbumService(
@@ -19,7 +20,9 @@ class SyncLinkedAlbumService {
final DriftRemoteAlbumRepository _remoteAlbumRepository;
final DriftAlbumApiRepository _albumApiRepository;
const SyncLinkedAlbumService(this._localAlbumRepository, this._remoteAlbumRepository, this._albumApiRepository);
SyncLinkedAlbumService(this._localAlbumRepository, this._remoteAlbumRepository, this._albumApiRepository);
final _log = Logger("SyncLinkedAlbumService");
Future<void> syncLinkedAlbums(String userId) async {
final selectedAlbums = await _localAlbumRepository.getBackupAlbums();
@@ -48,8 +51,12 @@ class SyncLinkedAlbumService {
}
Future<void> manageLinkedAlbums(List<LocalAlbum> localAlbums, String ownerId) async {
for (final album in localAlbums) {
await _processLocalAlbum(album, ownerId);
try {
for (final album in localAlbums) {
await _processLocalAlbum(album, ownerId);
}
} catch (error, stackTrace) {
_log.severe("Error managing linked albums", error, stackTrace);
}
}
@@ -93,7 +100,7 @@ class SyncLinkedAlbumService {
/// Creates a new remote album and links it to the local album
Future<void> _createAndLinkNewRemoteAlbum(LocalAlbum localAlbum) async {
debugPrint("Creating new remote album for local album: ${localAlbum.name}");
dPrint(() => "Creating new remote album for local album: ${localAlbum.name}");
final newRemoteAlbum = await _albumApiRepository.createDriftAlbum(localAlbum.name, assetIds: []);
await _remoteAlbumRepository.create(newRemoteAlbum, []);
return _localAlbumRepository.linkRemoteAlbum(localAlbum.id, newRemoteAlbum.id);
@@ -23,54 +23,18 @@ class SyncStreamService {
bool get isCancelled => _cancelChecker?.call() ?? false;
Future<void> sync() {
Future<void> sync() async {
_logger.info("Remote sync request for user");
// Start the sync stream and handle events
return _syncApiRepository.streamChanges(_handleEvents);
}
Future<void> handleWsAssetUploadReadyV1Batch(List<dynamic> batchData) async {
if (batchData.isEmpty) return;
_logger.info('Processing batch of ${batchData.length} AssetUploadReadyV1 events');
final List<SyncAssetV1> assets = [];
final List<SyncAssetExifV1> exifs = [];
try {
for (final data in batchData) {
if (data is! Map<String, dynamic>) {
continue;
}
final payload = data;
final assetData = payload['asset'];
final exifData = payload['exif'];
if (assetData == null || exifData == null) {
continue;
}
final asset = SyncAssetV1.fromJson(assetData);
final exif = SyncAssetExifV1.fromJson(exifData);
if (asset != null && exif != null) {
assets.add(asset);
exifs.add(exif);
}
}
if (assets.isNotEmpty && exifs.isNotEmpty) {
await _syncStreamRepository.updateAssetsV1(assets, debugLabel: 'websocket-batch');
await _syncStreamRepository.updateAssetsExifV1(exifs, debugLabel: 'websocket-batch');
_logger.info('Successfully processed ${assets.length} assets in batch');
}
} catch (error, stackTrace) {
_logger.severe("Error processing AssetUploadReadyV1 websocket batch events", error, stackTrace);
bool shouldReset = false;
await _syncApiRepository.streamChanges(_handleEvents, onReset: () => shouldReset = true);
if (shouldReset) {
_logger.info("Resetting sync state as requested by server");
await _syncApiRepository.streamChanges(_handleEvents);
}
}
Future<void> _handleEvents(List<SyncEvent> events, Function() abort) async {
Future<void> _handleEvents(List<SyncEvent> events, Function() abort, Function() reset) async {
List<SyncEvent> items = [];
for (final event in events) {
if (isCancelled) {
@@ -83,6 +47,10 @@ class SyncStreamService {
await _processBatch(items);
}
if (event.type == SyncEntityType.syncResetV1) {
reset();
}
items.add(event);
}
@@ -103,6 +71,8 @@ class SyncStreamService {
Future<void> _handleSyncData(SyncEntityType type, Iterable<Object> data) async {
_logger.fine("Processing sync data for $type of length ${data.length}");
switch (type) {
case SyncEntityType.authUserV1:
return _syncStreamRepository.updateAuthUsersV1(data.cast());
case SyncEntityType.userV1:
return _syncStreamRepository.updateUsersV1(data.cast());
case SyncEntityType.userDeleteV1:
@@ -159,6 +129,12 @@ class SyncStreamService {
// to acknowledge that the client has processed all the backfill events
case SyncEntityType.syncAckV1:
return;
// No-op. SyncCompleteV1 is used to signal the completion of the sync process
case SyncEntityType.syncCompleteV1:
return;
// Request to reset the client state. Clear everything related to remote entities
case SyncEntityType.syncResetV1:
return _syncStreamRepository.reset();
case SyncEntityType.memoryV1:
return _syncStreamRepository.updateMemoriesV1(data.cast());
case SyncEntityType.memoryDeleteV1:
@@ -193,4 +169,45 @@ class SyncStreamService {
_logger.warning("Unknown sync data type: $type");
}
}
Future<void> handleWsAssetUploadReadyV1Batch(List<dynamic> batchData) async {
if (batchData.isEmpty) return;
_logger.info('Processing batch of ${batchData.length} AssetUploadReadyV1 events');
final List<SyncAssetV1> assets = [];
final List<SyncAssetExifV1> exifs = [];
try {
for (final data in batchData) {
if (data is! Map<String, dynamic>) {
continue;
}
final payload = data;
final assetData = payload['asset'];
final exifData = payload['exif'];
if (assetData == null || exifData == null) {
continue;
}
final asset = SyncAssetV1.fromJson(assetData);
final exif = SyncAssetExifV1.fromJson(exifData);
if (asset != null && exif != null) {
assets.add(asset);
exifs.add(exif);
}
}
if (assets.isNotEmpty && exifs.isNotEmpty) {
await _syncStreamRepository.updateAssetsV1(assets, debugLabel: 'websocket-batch');
await _syncStreamRepository.updateAssetsExifV1(exifs, debugLabel: 'websocket-batch');
_logger.info('Successfully processed ${assets.length} assets in batch');
}
} catch (error, stackTrace) {
_logger.severe("Error processing AssetUploadReadyV1 websocket batch events", error, stackTrace);
}
}
}