refactor: hashing service (#21997)

* download only backup selected assets

* android impl

* fix tests

* limit concurrent hashing to 16

* extension cleanup

* optimized hashing

* hash only selected albums

* remove concurrency limit

* address review comments

* log more info on failure

* add native cancellation

* small batch size on ios, large on android

* fix: get correct resources

* cleanup getResource

* ios better hash cancellation

* handle graceful cancellation android

* do not trigger multiple hashing ops

* ios: fix circular reference, improve cancellation

* kotlin: more cancellation checks

* no need to create result

* cancel previous task

* avoid race condition

* ensure cancellation gets called

* fix cancellation not happening

---------

Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com>
Co-authored-by: mertalev <101130780+mertalev@users.noreply.github.com>
Co-authored-by: Alex <alex.tran1502@gmail.com>
This commit is contained in:
shenlong
2025-09-18 10:12:37 +05:30
committed by GitHub
parent 2411bf8374
commit 532ec10d5f
18 changed files with 662 additions and 269 deletions
+3 -1
View File
@@ -1,3 +1,5 @@
import 'dart:io';
const int noDbId = -9223372036854775808; // from Isar
const double downloadCompleted = -1;
const double downloadFailed = -2;
@@ -10,7 +12,7 @@ const int kSyncEventBatchSize = 5000;
const int kFetchLocalAssetsBatchSize = 40000;
// Hash batch limits
const int kBatchHashFileLimit = 256;
final int kBatchHashFileLimit = Platform.isIOS ? 32 : 512;
const int kBatchHashSizeLimit = 1024 * 1024 * 1024; // 1GB
// Secure storage keys
@@ -184,6 +184,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
try {
final backgroundSyncManager = _ref.read(backgroundSyncProvider);
final nativeSyncApi = _ref.read(nativeSyncApiProvider);
_isCleanedUp = true;
_ref.dispose();
@@ -199,7 +200,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
_drift.close(),
_driftLogger.close(),
backgroundSyncManager.cancel(),
backgroundSyncManager.cancelLocal(),
nativeSyncApi.cancelHashing(),
];
if (_isar.isOpen) {
+43 -57
View File
@@ -1,20 +1,18 @@
import 'dart:convert';
import 'package:flutter/services.dart';
import 'package:immich_mobile/constants/constants.dart';
import 'package:immich_mobile/domain/models/album/local_album.model.dart';
import 'package:immich_mobile/domain/models/asset/base_asset.model.dart';
import 'package:immich_mobile/infrastructure/repositories/local_album.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/local_asset.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/storage.repository.dart';
import 'package:immich_mobile/platform/native_sync_api.g.dart';
import 'package:logging/logging.dart';
const String _kHashCancelledCode = "HASH_CANCELLED";
class HashService {
final int batchSizeLimit;
final int batchFileLimit;
final int _batchSize;
final DriftLocalAlbumRepository _localAlbumRepository;
final DriftLocalAssetRepository _localAssetRepository;
final StorageRepository _storageRepository;
final NativeSyncApi _nativeSyncApi;
final bool Function()? _cancelChecker;
final _log = Logger('HashService');
@@ -22,37 +20,42 @@ class HashService {
HashService({
required DriftLocalAlbumRepository localAlbumRepository,
required DriftLocalAssetRepository localAssetRepository,
required StorageRepository storageRepository,
required NativeSyncApi nativeSyncApi,
bool Function()? cancelChecker,
this.batchSizeLimit = kBatchHashSizeLimit,
this.batchFileLimit = kBatchHashFileLimit,
int? batchSize,
}) : _localAlbumRepository = localAlbumRepository,
_localAssetRepository = localAssetRepository,
_storageRepository = storageRepository,
_cancelChecker = cancelChecker,
_nativeSyncApi = nativeSyncApi;
_nativeSyncApi = nativeSyncApi,
_batchSize = batchSize ?? kBatchHashFileLimit;
bool get isCancelled => _cancelChecker?.call() ?? false;
Future<void> hashAssets() async {
_log.info("Starting hashing of assets");
final Stopwatch stopwatch = Stopwatch()..start();
// Sorted by backupSelection followed by isCloud
final localAlbums = await _localAlbumRepository.getAll(
sortBy: {SortLocalAlbumsBy.backupSelection, SortLocalAlbumsBy.isIosSharedAlbum},
);
try {
// Sorted by backupSelection followed by isCloud
final localAlbums = await _localAlbumRepository.getBackupAlbums();
for (final album in localAlbums) {
if (isCancelled) {
_log.warning("Hashing cancelled. Stopped processing albums.");
break;
}
for (final album in localAlbums) {
if (isCancelled) {
_log.warning("Hashing cancelled. Stopped processing albums.");
break;
}
final assetsToHash = await _localAlbumRepository.getAssetsToHash(album.id);
if (assetsToHash.isNotEmpty) {
await _hashAssets(album, assetsToHash);
final assetsToHash = await _localAlbumRepository.getAssetsToHash(album.id);
if (assetsToHash.isNotEmpty) {
await _hashAssets(album, assetsToHash);
}
}
} on PlatformException catch (e) {
if (e.code == _kHashCancelledCode) {
_log.warning("Hashing cancelled by platform");
return;
}
} catch (e, s) {
_log.severe("Error during hashing", e, s);
}
stopwatch.stop();
@@ -63,8 +66,7 @@ class HashService {
/// with hash for those that were successfully hashed. Hashes are looked up in a table
/// [LocalAssetHashEntity] by local id. Only missing entries are newly hashed and added to the DB.
Future<void> _hashAssets(LocalAlbum album, List<LocalAsset> assetsToHash) async {
int bytesProcessed = 0;
final toHash = <_AssetToPath>[];
final toHash = <String, LocalAsset>{};
for (final asset in assetsToHash) {
if (isCancelled) {
@@ -72,21 +74,10 @@ class HashService {
return;
}
final file = await _storageRepository.getFileForAsset(asset.id);
if (file == null) {
_log.warning(
"Cannot get file for asset ${asset.id}, name: ${asset.name}, created on: ${asset.createdAt} from album: ${album.name}",
);
continue;
}
bytesProcessed += await file.length();
toHash.add(_AssetToPath(asset: asset, path: file.path));
if (toHash.length >= batchFileLimit || bytesProcessed >= batchSizeLimit) {
toHash[asset.id] = asset;
if (toHash.length == _batchSize) {
await _processBatch(album, toHash);
toHash.clear();
bytesProcessed = 0;
}
}
@@ -94,33 +85,36 @@ class HashService {
}
/// Processes a batch of assets.
Future<void> _processBatch(LocalAlbum album, List<_AssetToPath> toHash) async {
Future<void> _processBatch(LocalAlbum album, Map<String, LocalAsset> toHash) async {
if (toHash.isEmpty) {
return;
}
_log.fine("Hashing ${toHash.length} files");
final hashed = <LocalAsset>[];
final hashes = await _nativeSyncApi.hashPaths(toHash.map((e) => e.path).toList());
final hashed = <String, String>{};
final hashResults = await _nativeSyncApi.hashAssets(
toHash.keys.toList(),
allowNetworkAccess: album.backupSelection == BackupSelection.selected,
);
assert(
hashes.length == toHash.length,
"Hashes length does not match toHash length: ${hashes.length} != ${toHash.length}",
hashResults.length == toHash.length,
"Hashes length does not match toHash length: ${hashResults.length} != ${toHash.length}",
);
for (int i = 0; i < hashes.length; i++) {
for (int i = 0; i < hashResults.length; i++) {
if (isCancelled) {
_log.warning("Hashing cancelled. Stopped processing batch.");
return;
}
final hash = hashes[i];
final asset = toHash[i].asset;
if (hash?.length == 20) {
hashed.add(asset.copyWith(checksum: base64.encode(hash!)));
final hashResult = hashResults[i];
if (hashResult.hash != null) {
hashed[hashResult.assetId] = hashResult.hash!;
} else {
final asset = toHash[hashResult.assetId];
_log.warning(
"Failed to hash file for ${asset.id}: ${asset.name} created at ${asset.createdAt} from album: ${album.name}",
"Failed to hash asset with id: ${hashResult.assetId}, name: ${asset?.name}, createdAt: ${asset?.createdAt}, from album: ${album.name}. Error: ${hashResult.error ?? "unknown"}",
);
}
}
@@ -128,13 +122,5 @@ class HashService {
_log.fine("Hashed ${hashed.length}/${toHash.length} assets");
await _localAssetRepository.updateHashes(hashed);
await _storageRepository.clearCache();
}
}
class _AssetToPath {
final LocalAsset asset;
final String path;
const _AssetToPath({required this.asset, required this.path});
}
@@ -36,17 +36,17 @@ class DriftLocalAssetRepository extends DriftDatabaseRepository {
Stream<LocalAsset?> watch(String id) => _assetSelectable(id).watchSingleOrNull();
Future<void> updateHashes(Iterable<LocalAsset> hashes) {
Future<void> updateHashes(Map<String, String> hashes) {
if (hashes.isEmpty) {
return Future.value();
}
return _db.batch((batch) async {
for (final asset in hashes) {
for (final entry in hashes.entries) {
batch.update(
_db.localAssetEntity,
LocalAssetEntityCompanion(checksum: Value(asset.checksum)),
where: (e) => e.id.equals(asset.id),
LocalAssetEntityCompanion(checksum: Value(entry.value)),
where: (e) => e.id.equals(entry.key),
);
}
});
@@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:io';
import 'package:auto_route/auto_route.dart';
@@ -5,12 +6,14 @@ import 'package:easy_localization/easy_localization.dart';
import 'package:flutter/material.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/domain/models/album/local_album.model.dart';
import 'package:immich_mobile/domain/services/sync_linked_album.service.dart';
import 'package:immich_mobile/extensions/build_context_extensions.dart';
import 'package:immich_mobile/extensions/translate_extensions.dart';
import 'package:immich_mobile/providers/app_settings.provider.dart';
import 'package:immich_mobile/domain/services/sync_linked_album.service.dart';
import 'package:immich_mobile/providers/background_sync.provider.dart';
import 'package:immich_mobile/providers/backup/backup_album.provider.dart';
import 'package:immich_mobile/providers/backup/drift_backup.provider.dart';
import 'package:immich_mobile/providers/infrastructure/platform.provider.dart';
import 'package:immich_mobile/providers/user.provider.dart';
import 'package:immich_mobile/services/app_settings.service.dart';
import 'package:immich_mobile/widgets/backup/drift_album_info_list_tile.dart';
@@ -64,16 +67,6 @@ class _DriftBackupAlbumSelectionPageState extends ConsumerState<DriftBackupAlbum
});
await _handleLinkedAlbumFuture;
}
// Restart backup if total count changed and backup is enabled
final currentTotalAssetCount = ref.read(driftBackupProvider.select((p) => p.totalCount));
final totalChanged = currentTotalAssetCount != _initialTotalAssetCount;
final isBackupEnabled = ref.read(appSettingsServiceProvider).getSetting(AppSettingsEnum.enableBackup);
if (totalChanged && isBackupEnabled) {
await ref.read(driftBackupProvider.notifier).cancel();
await ref.read(driftBackupProvider.notifier).startBackup(user.id);
}
}
@override
@@ -102,6 +95,27 @@ class _DriftBackupAlbumSelectionPageState extends ConsumerState<DriftBackupAlbum
onPopInvokedWithResult: (didPop, _) async {
if (!didPop) {
await _handlePagePopped();
final user = ref.read(currentUserProvider);
if (user == null) {
return;
}
final isBackupEnabled = ref.read(appSettingsServiceProvider).getSetting(AppSettingsEnum.enableBackup);
await ref.read(driftBackupProvider.notifier).getBackupStatus(user.id);
final currentTotalAssetCount = ref.read(driftBackupProvider.select((p) => p.totalCount));
final totalChanged = currentTotalAssetCount != _initialTotalAssetCount;
final backupNotifier = ref.read(driftBackupProvider.notifier);
final backgroundSync = ref.read(backgroundSyncProvider);
final nativeSync = ref.read(nativeSyncApiProvider);
if (totalChanged) {
// Waits for hashing to be cancelled before starting a new one
unawaited(nativeSync.cancelHashing().whenComplete(() => backgroundSync.hashAssets()));
if (isBackupEnabled) {
unawaited(backupNotifier.cancel().whenComplete(() => backupNotifier.startBackup(user.id)));
}
}
Navigator.of(context).pop();
}
},
+71 -4
View File
@@ -205,6 +205,45 @@ class SyncDelta {
int get hashCode => Object.hashAll(_toList());
}
class HashResult {
HashResult({required this.assetId, this.error, this.hash});
String assetId;
String? error;
String? hash;
List<Object?> _toList() {
return <Object?>[assetId, error, hash];
}
Object encode() {
return _toList();
}
static HashResult decode(Object result) {
result as List<Object?>;
return HashResult(assetId: result[0]! as String, error: result[1] as String?, hash: result[2] as String?);
}
@override
// ignore: avoid_equals_and_hash_code_on_mutable_classes
bool operator ==(Object other) {
if (other is! HashResult || other.runtimeType != runtimeType) {
return false;
}
if (identical(this, other)) {
return true;
}
return _deepEquals(encode(), other.encode());
}
@override
// ignore: avoid_equals_and_hash_code_on_mutable_classes
int get hashCode => Object.hashAll(_toList());
}
class _PigeonCodec extends StandardMessageCodec {
const _PigeonCodec();
@override
@@ -221,6 +260,9 @@ class _PigeonCodec extends StandardMessageCodec {
} else if (value is SyncDelta) {
buffer.putUint8(131);
writeValue(buffer, value.encode());
} else if (value is HashResult) {
buffer.putUint8(132);
writeValue(buffer, value.encode());
} else {
super.writeValue(buffer, value);
}
@@ -235,6 +277,8 @@ class _PigeonCodec extends StandardMessageCodec {
return PlatformAlbum.decode(readValue(buffer)!);
case 131:
return SyncDelta.decode(readValue(buffer)!);
case 132:
return HashResult.decode(readValue(buffer)!);
default:
return super.readValueOfType(type, buffer);
}
@@ -468,15 +512,15 @@ class NativeSyncApi {
}
}
Future<List<Uint8List?>> hashPaths(List<String> paths) async {
Future<List<HashResult>> hashAssets(List<String> assetIds, {bool allowNetworkAccess = false}) async {
final String pigeonVar_channelName =
'dev.flutter.pigeon.immich_mobile.NativeSyncApi.hashPaths$pigeonVar_messageChannelSuffix';
'dev.flutter.pigeon.immich_mobile.NativeSyncApi.hashAssets$pigeonVar_messageChannelSuffix';
final BasicMessageChannel<Object?> pigeonVar_channel = BasicMessageChannel<Object?>(
pigeonVar_channelName,
pigeonChannelCodec,
binaryMessenger: pigeonVar_binaryMessenger,
);
final Future<Object?> pigeonVar_sendFuture = pigeonVar_channel.send(<Object?>[paths]);
final Future<Object?> pigeonVar_sendFuture = pigeonVar_channel.send(<Object?>[assetIds, allowNetworkAccess]);
final List<Object?>? pigeonVar_replyList = await pigeonVar_sendFuture as List<Object?>?;
if (pigeonVar_replyList == null) {
throw _createConnectionError(pigeonVar_channelName);
@@ -492,7 +536,30 @@ class NativeSyncApi {
message: 'Host platform returned null value for non-null return value.',
);
} else {
return (pigeonVar_replyList[0] as List<Object?>?)!.cast<Uint8List?>();
return (pigeonVar_replyList[0] as List<Object?>?)!.cast<HashResult>();
}
}
Future<void> cancelHashing() async {
final String pigeonVar_channelName =
'dev.flutter.pigeon.immich_mobile.NativeSyncApi.cancelHashing$pigeonVar_messageChannelSuffix';
final BasicMessageChannel<Object?> pigeonVar_channel = BasicMessageChannel<Object?>(
pigeonVar_channelName,
pigeonChannelCodec,
binaryMessenger: pigeonVar_binaryMessenger,
);
final Future<Object?> pigeonVar_sendFuture = pigeonVar_channel.send(null);
final List<Object?>? pigeonVar_replyList = await pigeonVar_sendFuture as List<Object?>?;
if (pigeonVar_replyList == null) {
throw _createConnectionError(pigeonVar_channelName);
} else if (pigeonVar_replyList.length > 1) {
throw PlatformException(
code: pigeonVar_replyList[0]! as String,
message: pigeonVar_replyList[1] as String?,
details: pigeonVar_replyList[2],
);
} else {
return;
}
}
}
@@ -10,7 +10,6 @@ import 'package:immich_mobile/providers/infrastructure/asset.provider.dart';
import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
import 'package:immich_mobile/providers/infrastructure/platform.provider.dart';
import 'package:immich_mobile/providers/infrastructure/storage.provider.dart';
final syncStreamServiceProvider = Provider(
(ref) => SyncStreamService(
@@ -35,7 +34,6 @@ final hashServiceProvider = Provider(
(ref) => HashService(
localAlbumRepository: ref.watch(localAlbumRepository),
localAssetRepository: ref.watch(localAssetRepository),
storageRepository: ref.watch(storageRepositoryProvider),
nativeSyncApi: ref.watch(nativeSyncApiProvider),
),
);
+3 -2
View File
@@ -16,9 +16,10 @@ class HashService {
required IsarDeviceAssetRepository deviceAssetRepository,
required BackgroundService backgroundService,
this.batchSizeLimit = kBatchHashSizeLimit,
this.batchFileLimit = kBatchHashFileLimit,
int? batchFileLimit,
}) : _deviceAssetRepository = deviceAssetRepository,
_backgroundService = backgroundService;
_backgroundService = backgroundService,
batchFileLimit = batchFileLimit ?? kBatchHashFileLimit;
final IsarDeviceAssetRepository _deviceAssetRepository;
final BackgroundService _backgroundService;