feat(mobile): use efficient sync (#8842)

* feat(mobile): use efficient sync

review feedback

* adapt to changed  server endpoints

* formatting

* fix memory lane bug

* fix: bad merge

* fix call not returning correct number of asset

---------

Co-authored-by: Alex Tran <alex.tran1502@gmail.com>
This commit is contained in:
Fynn Petersen-Frey
2024-05-14 17:35:37 +02:00
committed by GitHub
parent acc611a3d9
commit 116043b2b4
12 changed files with 185 additions and 125 deletions
+2
View File
@@ -23,6 +23,7 @@ class ApiService {
late PersonApi personApi;
late AuditApi auditApi;
late SharedLinkApi sharedLinkApi;
late SyncApi syncApi;
late SystemConfigApi systemConfigApi;
late ActivityApi activityApi;
late DownloadApi downloadApi;
@@ -53,6 +54,7 @@ class ApiService {
personApi = PersonApi(_apiClient);
auditApi = AuditApi(_apiClient);
sharedLinkApi = SharedLinkApi(_apiClient);
syncApi = SyncApi(_apiClient);
systemConfigApi = SystemConfigApi(_apiClient);
activityApi = ActivityApi(_apiClient);
downloadApi = DownloadApi(_apiClient);
+43 -37
View File
@@ -5,13 +5,14 @@ import 'dart:async';
import 'package:flutter/material.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/entities/asset.entity.dart';
import 'package:immich_mobile/entities/etag.entity.dart';
import 'package:immich_mobile/entities/exif_info.entity.dart';
import 'package:immich_mobile/entities/store.entity.dart';
import 'package:immich_mobile/entities/user.entity.dart';
import 'package:immich_mobile/providers/api.provider.dart';
import 'package:immich_mobile/providers/db.provider.dart';
import 'package:immich_mobile/services/api.service.dart';
import 'package:immich_mobile/services/sync.service.dart';
import 'package:immich_mobile/services/user.service.dart';
import 'package:isar/isar.dart';
import 'package:logging/logging.dart';
import 'package:maplibre_gl/maplibre_gl.dart';
@@ -21,6 +22,7 @@ final assetServiceProvider = Provider(
(ref) => AssetService(
ref.watch(apiServiceProvider),
ref.watch(syncServiceProvider),
ref.watch(userServiceProvider),
ref.watch(dbProvider),
),
);
@@ -28,24 +30,33 @@ final assetServiceProvider = Provider(
class AssetService {
final ApiService _apiService;
final SyncService _syncService;
final UserService _userService;
final log = Logger('AssetService');
final Isar _db;
AssetService(
this._apiService,
this._syncService,
this._userService,
this._db,
);
/// Checks the server for updated assets and updates the local database if
/// required. Returns `true` if there were any changes.
Future<bool> refreshRemoteAssets([User? user]) async {
user ??= Store.get<User>(StoreKey.currentUser);
Future<bool> refreshRemoteAssets() async {
final syncedUserIds = await _db.eTags.where().idProperty().findAll();
final List<User> syncedUsers = syncedUserIds.isEmpty
? []
: await _db.users
.where()
.anyOf(syncedUserIds, (q, id) => q.idEqualTo(id))
.findAll();
final Stopwatch sw = Stopwatch()..start();
final bool changes = await _syncService.syncRemoteAssetsToDb(
user,
_getRemoteAssetChanges,
_getRemoteAssets,
users: syncedUsers,
getChangedAssets: _getRemoteAssetChanges,
loadAssets: _getRemoteAssets,
refreshUsers: _userService.getUsersFromServer,
);
debugPrint("refreshRemoteAssets full took ${sw.elapsedMilliseconds}ms");
return changes;
@@ -53,14 +64,15 @@ class AssetService {
/// Returns `(null, null)` if changes are invalid -> requires full sync
Future<(List<Asset>? toUpsert, List<String>? toDelete)>
_getRemoteAssetChanges(User user, DateTime since) async {
final deleted = await _apiService.auditApi
.getAuditDeletes(since, EntityType.ASSET, userId: user.id);
if (deleted == null || deleted.needsFullSync) return (null, null);
final assetDto = await _apiService.assetApi
.getAllAssets(userId: user.id, updatedAfter: since);
if (assetDto == null) return (null, null);
return (assetDto.map(Asset.remote).toList(), deleted.ids);
_getRemoteAssetChanges(List<User> users, DateTime since) async {
final dto = AssetDeltaSyncDto(
updatedAfter: since,
userIds: users.map((e) => e.id).toList(),
);
final changes = await _apiService.syncApi.getDeltaSync(dto);
return changes == null || changes.needsFullSync
? (null, null)
: (changes.upserted.map(Asset.remote).toList(), changes.deleted);
}
/// Returns the list of people of the given asset id.
@@ -85,38 +97,32 @@ class AssetService {
}
/// Returns `null` if the server state did not change, else list of assets
Future<List<Asset>?> _getRemoteAssets(User user) async {
Future<List<Asset>?> _getRemoteAssets(User user, DateTime until) async {
const int chunkSize = 10000;
try {
final DateTime now = DateTime.now().toUtc();
final List<Asset> allAssets = [];
for (int i = 0;; i += chunkSize) {
final List<AssetResponseDto>? assets =
await _apiService.assetApi.getAllAssets(
DateTime? lastCreationDate;
String? lastId;
// will break on error or once all assets are loaded
while (true) {
final dto = AssetFullSyncDto(
limit: chunkSize,
updatedUntil: until,
lastId: lastId,
lastCreationDate: lastCreationDate,
userId: user.id,
// updatedBefore is important! without it we could
// a) get the same Asset multiple times in different versions (when
// the asset is modified while the chunks are loaded from the server)
// b) miss assets when new assets are inserted in between the calls
updatedBefore: now,
skip: i,
take: chunkSize,
);
if (assets == null) {
return null;
}
final List<AssetResponseDto>? assets =
await _apiService.syncApi.getFullSyncForUser(dto);
if (assets == null) return null;
allAssets.addAll(assets.map(Asset.remote));
if (assets.length < chunkSize) {
break;
}
if (assets.isEmpty) break;
lastCreationDate = assets.last.fileCreatedAt;
lastId = assets.last.id;
}
return allAssets;
} catch (error, stack) {
log.severe(
'Error while getting remote assets',
error,
stack,
);
log.severe('Error while getting remote assets', error, stack);
return null;
}
}
+10 -6
View File
@@ -37,12 +37,16 @@ class MemoryService {
List<Memory> memories = [];
for (final MemoryLaneResponseDto(:title, :assets) in data) {
memories.add(
Memory(
title: title,
assets: await _db.assets.getAllByRemoteId(assets.map((e) => e.id)),
),
);
final dbAssets =
await _db.assets.getAllByRemoteId(assets.map((e) => e.id));
if (dbAssets.isNotEmpty) {
memories.add(
Memory(
title: title,
assets: dbAssets,
),
);
}
}
return memories.isNotEmpty ? memories : null;
+59 -21
View File
@@ -40,18 +40,20 @@ class SyncService {
/// Syncs remote assets owned by the logged-in user to the DB
/// Returns `true` if there were any changes
Future<bool> syncRemoteAssetsToDb(
User user,
Future<(List<Asset>? toUpsert, List<String>? toDelete)> Function(
User user,
Future<bool> syncRemoteAssetsToDb({
required List<User> users,
required Future<(List<Asset>? toUpsert, List<String>? toDelete)> Function(
List<User> users,
DateTime since,
) getChangedAssets,
FutureOr<List<Asset>?> Function(User user) loadAssets,
) =>
required FutureOr<List<Asset>?> Function(User user, DateTime until)
loadAssets,
required FutureOr<List<User>?> Function() refreshUsers,
}) =>
_lock.run(
() async =>
await _syncRemoteAssetChanges(user, getChangedAssets) ??
await _syncRemoteAssetsFull(user, loadAssets),
await _syncRemoteAssetChanges(users, getChangedAssets) ??
await _syncRemoteAssetsFull(refreshUsers, loadAssets),
);
/// Syncs remote albums to the database
@@ -111,7 +113,8 @@ class SyncService {
both: (User a, User b) {
if (!a.updatedAt.isAtSameMomentAs(b.updatedAt) ||
a.isPartnerSharedBy != b.isPartnerSharedBy ||
a.isPartnerSharedWith != b.isPartnerSharedWith) {
a.isPartnerSharedWith != b.isPartnerSharedWith ||
a.inTimeline != b.inTimeline) {
toUpsert.add(a);
return true;
}
@@ -149,17 +152,22 @@ class SyncService {
/// Efficiently syncs assets via changes. Returns `null` when a full sync is required.
Future<bool?> _syncRemoteAssetChanges(
User user,
List<User> users,
Future<(List<Asset>? toUpsert, List<String>? toDelete)> Function(
User user,
List<User> users,
DateTime since,
) getChangedAssets,
) async {
final DateTime? since = _db.eTags.getByIdSync(user.id)?.time?.toUtc();
final currentUser = Store.get(StoreKey.currentUser);
final DateTime? since =
_db.eTags.getSync(currentUser.isarId)?.time?.toUtc();
if (since == null) return null;
final DateTime now = DateTime.now();
final (toUpsert, toDelete) = await getChangedAssets(user, since);
if (toUpsert == null || toDelete == null) return null;
final (toUpsert, toDelete) = await getChangedAssets(users, since);
if (toUpsert == null || toDelete == null) {
await _clearUserAssetsETag(users);
return null;
}
try {
if (toDelete.isNotEmpty) {
await handleRemoteAssetRemoval(toDelete);
@@ -169,7 +177,7 @@ class SyncService {
await upsertAssetsWithExif(updated);
}
if (toUpsert.isNotEmpty || toDelete.isNotEmpty) {
await _updateUserAssetsETag(user, now);
await _updateUserAssetsETag(users, now);
return true;
}
return false;
@@ -203,11 +211,34 @@ class SyncService {
/// Syncs assets by loading and comparing all assets from the server.
Future<bool> _syncRemoteAssetsFull(
FutureOr<List<User>?> Function() refreshUsers,
FutureOr<List<Asset>?> Function(User user, DateTime until) loadAssets,
) async {
final serverUsers = await refreshUsers();
if (serverUsers == null) {
_log.warning("_syncRemoteAssetsFull aborted because user refresh failed");
return false;
}
await _syncUsersFromServer(serverUsers);
final List<User> users = await _db.users
.filter()
.isPartnerSharedWithEqualTo(true)
.or()
.isarIdEqualTo(Store.get(StoreKey.currentUser).isarId)
.findAll();
bool changes = false;
for (User u in users) {
changes |= await _syncRemoteAssetsForUser(u, loadAssets);
}
return changes;
}
Future<bool> _syncRemoteAssetsForUser(
User user,
FutureOr<List<Asset>?> Function(User user) loadAssets,
FutureOr<List<Asset>?> Function(User user, DateTime until) loadAssets,
) async {
final DateTime now = DateTime.now().toUtc();
final List<Asset>? remote = await loadAssets(user);
final List<Asset>? remote = await loadAssets(user, now);
if (remote == null) {
return false;
}
@@ -225,7 +256,7 @@ class SyncService {
final (toAdd, toUpdate, toRemove) = _diffAssets(remote, inDb, remote: true);
if (toAdd.isEmpty && toUpdate.isEmpty && toRemove.isEmpty) {
await _updateUserAssetsETag(user, now);
await _updateUserAssetsETag([user], now);
return false;
}
final idsToDelete = toRemove.map((e) => e.id).toList();
@@ -235,12 +266,19 @@ class SyncService {
} on IsarError catch (e) {
_log.severe("Failed to sync remote assets to db", e);
}
await _updateUserAssetsETag(user, now);
await _updateUserAssetsETag([user], now);
return true;
}
Future<void> _updateUserAssetsETag(User user, DateTime time) =>
_db.writeTxn(() => _db.eTags.put(ETag(id: user.id, time: time)));
Future<void> _updateUserAssetsETag(List<User> users, DateTime time) {
final etags = users.map((u) => ETag(id: u.id, time: time)).toList();
return _db.writeTxn(() => _db.eTags.putAll(etags));
}
Future<void> _clearUserAssetsETag(List<User> users) {
final ids = users.map((u) => u.id).toList();
return _db.writeTxn(() => _db.eTags.deleteAllById(ids));
}
/// Syncs remote albums to the database
/// returns `true` if there were any changes
+8 -2
View File
@@ -70,7 +70,7 @@ class UserService {
}
}
Future<bool> refreshUsers() async {
Future<List<User>?> getUsersFromServer() async {
final List<User>? users = await _getAllUsers(isAll: true);
final List<User>? sharedBy =
await _partnerService.getPartners(PartnerDirection.sharedBy);
@@ -79,7 +79,7 @@ class UserService {
if (users == null || sharedBy == null || sharedWith == null) {
_log.warning("Failed to refresh users");
return false;
return null;
}
users.sortBy((u) => u.id);
@@ -108,6 +108,12 @@ class UserService {
onlySecond: (_) {},
);
return users;
}
Future<bool> refreshUsers() async {
final users = await getUsersFromServer();
if (users == null) return false;
return _syncService.syncUsersFromServer(users);
}
}