refactor(mobile): device asset entity to use modified time (#17064)

* refactor: device asset entity to use modified time

* chore: cleanup

* refactor: remove album media dependency from hashservice

* refactor: return updated copy of asset

* add hash service tests

* chore: rename hash batch constants

* chore: log the number of assets processed during migration

* chore: more logs

* refactor: use lookup and more tests

* use sort approach

* refactor hash service to use for loop instead

* refactor: rename to getByIds

---------

Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com>
This commit is contained in:
shenlong
2025-04-04 01:12:35 +05:30
committed by GitHub
parent e8b4ac0522
commit 97e52c5156
23 changed files with 1801 additions and 185 deletions
+162 -129
View File
@@ -1,172 +1,205 @@
// ignore_for_file: avoid-unsafe-collection-methods
import 'dart:convert';
import 'dart:io';
import 'package:flutter/foundation.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/entities/album.entity.dart';
import 'package:immich_mobile/interfaces/album_media.interface.dart';
import 'package:immich_mobile/interfaces/asset.interface.dart';
import 'package:immich_mobile/repositories/album_media.repository.dart';
import 'package:immich_mobile/repositories/asset.repository.dart';
import 'package:immich_mobile/services/background.service.dart';
import 'package:immich_mobile/entities/android_device_asset.entity.dart';
import 'package:immich_mobile/constants/constants.dart';
import 'package:immich_mobile/domain/interfaces/device_asset.interface.dart';
import 'package:immich_mobile/domain/models/device_asset.model.dart';
import 'package:immich_mobile/entities/asset.entity.dart';
import 'package:immich_mobile/entities/device_asset.entity.dart';
import 'package:immich_mobile/entities/ios_device_asset.entity.dart';
import 'package:immich_mobile/extensions/string_extensions.dart';
import 'package:immich_mobile/providers/infrastructure/device_asset.provider.dart';
import 'package:immich_mobile/services/background.service.dart';
import 'package:logging/logging.dart';
class HashService {
HashService(
this._assetRepository,
this._backgroundService,
this._albumMediaRepository,
);
final IAssetRepository _assetRepository;
final BackgroundService _backgroundService;
final IAlbumMediaRepository _albumMediaRepository;
final _log = Logger('HashService');
HashService({
required IDeviceAssetRepository deviceAssetRepository,
required BackgroundService backgroundService,
this.batchSizeLimit = kBatchHashSizeLimit,
this.batchFileLimit = kBatchHashFileLimit,
}) : _deviceAssetRepository = deviceAssetRepository,
_backgroundService = backgroundService;
/// Returns all assets that were successfully hashed
Future<List<Asset>> getHashedAssets(
Album album, {
int start = 0,
int end = 0x7fffffffffffffff,
DateTime? modifiedFrom,
DateTime? modifiedUntil,
Set<String>? excludedAssets,
}) async {
final entities = await _albumMediaRepository.getAssets(
album.localId!,
start: start,
end: end,
modifiedFrom: modifiedFrom,
modifiedUntil: modifiedUntil,
);
final filtered = excludedAssets == null
? entities
: entities.where((e) => !excludedAssets.contains(e.localId!)).toList();
return _hashAssets(filtered);
}
final IDeviceAssetRepository _deviceAssetRepository;
final BackgroundService _backgroundService;
final int batchSizeLimit;
final int batchFileLimit;
final _log = Logger('HashService');
/// Processes a list of local [Asset]s, storing their hash and returning only those
/// that were successfully hashed. Hashes are looked up in a DB table
/// [AndroidDeviceAsset] / [IOSDeviceAsset] by local id. Only missing
/// entries are newly hashed and added to the DB table.
Future<List<Asset>> _hashAssets(List<Asset> assets) async {
const int batchFileCount = 128;
const int batchDataSize = 1024 * 1024 * 1024; // 1GB
/// [DeviceAsset] by local id. Only missing entries are newly hashed and added to the DB table.
Future<List<Asset>> hashAssets(List<Asset> assets) async {
assets.sort(Asset.compareByLocalId);
final ids = assets
.map(Platform.isAndroid ? (a) => a.localId!.toInt() : (a) => a.localId!)
.toList();
final List<DeviceAsset?> hashes =
await _assetRepository.getDeviceAssetsById(ids);
final List<DeviceAsset> toAdd = [];
final List<String> toHash = [];
// Get and sort DB entries - guaranteed to be a subset of assets
final hashesInDB = await _deviceAssetRepository.getByIds(
assets.map((a) => a.localId!).toList(),
);
hashesInDB.sort((a, b) => a.assetId.compareTo(b.assetId));
int bytes = 0;
int dbIndex = 0;
int bytesProcessed = 0;
final hashedAssets = <Asset>[];
final toBeHashed = <_AssetPath>[];
final toBeDeleted = <String>[];
for (int i = 0; i < assets.length; i++) {
if (hashes[i] != null) {
for (int assetIndex = 0; assetIndex < assets.length; assetIndex++) {
final asset = assets[assetIndex];
DeviceAsset? matchingDbEntry;
if (dbIndex < hashesInDB.length) {
final deviceAsset = hashesInDB[dbIndex];
if (deviceAsset.assetId == asset.localId) {
matchingDbEntry = deviceAsset;
dbIndex++;
}
}
if (matchingDbEntry != null &&
matchingDbEntry.hash.isNotEmpty &&
matchingDbEntry.modifiedTime.isAtSameMomentAs(asset.fileModifiedAt)) {
// Reuse the existing hash
hashedAssets.add(
asset.copyWith(checksum: base64.encode(matchingDbEntry.hash)),
);
continue;
}
File? file;
try {
file = await assets[i].local!.originFile;
} catch (error, stackTrace) {
_log.warning(
"Error getting file to hash for asset ${assets[i].localId}, name: ${assets[i].fileName}, created on: ${assets[i].fileCreatedAt}, skipping",
error,
stackTrace,
);
}
final file = await _tryGetAssetFile(asset);
if (file == null) {
final fileName = assets[i].fileName;
_log.warning(
"Failed to get file for asset ${assets[i].localId}, name: $fileName, created on: ${assets[i].fileCreatedAt}, skipping",
);
// Can't access file, delete any DB entry
if (matchingDbEntry != null) {
toBeDeleted.add(matchingDbEntry.assetId);
}
continue;
}
bytes += await file.length();
toHash.add(file.path);
final deviceAsset = Platform.isAndroid
? AndroidDeviceAsset(id: ids[i] as int, hash: const [])
: IOSDeviceAsset(id: ids[i] as String, hash: const []);
toAdd.add(deviceAsset);
hashes[i] = deviceAsset;
if (toHash.length == batchFileCount || bytes >= batchDataSize) {
await _processBatch(toHash, toAdd);
toAdd.clear();
toHash.clear();
bytes = 0;
bytesProcessed += await file.length();
toBeHashed.add(_AssetPath(asset: asset, path: file.path));
if (_shouldProcessBatch(toBeHashed.length, bytesProcessed)) {
hashedAssets.addAll(await _processBatch(toBeHashed, toBeDeleted));
toBeHashed.clear();
toBeDeleted.clear();
bytesProcessed = 0;
}
}
if (toHash.isNotEmpty) {
await _processBatch(toHash, toAdd);
assert(dbIndex == hashesInDB.length, "All hashes should've been processed");
// Process any remaining files
if (toBeHashed.isNotEmpty) {
hashedAssets.addAll(await _processBatch(toBeHashed, toBeDeleted));
}
return _getHashedAssets(assets, hashes);
// Clean up deleted references
if (toBeDeleted.isNotEmpty) {
await _deviceAssetRepository.deleteIds(toBeDeleted);
}
return hashedAssets;
}
/// Processes a batch of files and saves any successfully hashed
/// values to the DB table.
Future<void> _processBatch(
final List<String> toHash,
final List<DeviceAsset> toAdd,
bool _shouldProcessBatch(int assetCount, int bytesProcessed) =>
assetCount >= batchFileLimit || bytesProcessed >= batchSizeLimit;
Future<File?> _tryGetAssetFile(Asset asset) async {
try {
final file = await asset.local!.originFile;
if (file == null) {
_log.warning(
"Failed to get file for asset ${asset.localId ?? '<N/A>'}, name: ${asset.fileName}, created on: ${asset.fileCreatedAt}, skipping",
);
return null;
}
return file;
} catch (error, stackTrace) {
_log.warning(
"Error getting file to hash for asset ${asset.localId ?? '<N/A>'}, name: ${asset.fileName}, created on: ${asset.fileCreatedAt}, skipping",
error,
stackTrace,
);
return null;
}
}
/// Processes a batch of files and returns a list of successfully hashed assets after saving
/// them in [DeviceAssetToHash] for future retrieval
Future<List<Asset>> _processBatch(
List<_AssetPath> toBeHashed,
List<String> toBeDeleted,
) async {
final hashes = await _hashFiles(toHash);
bool anyNull = false;
for (int j = 0; j < hashes.length; j++) {
if (hashes[j]?.length == 20) {
toAdd[j].hash = hashes[j]!;
_log.info("Hashing ${toBeHashed.length} files");
final hashes = await _hashFiles(toBeHashed.map((e) => e.path).toList());
assert(
hashes.length == toBeHashed.length,
"Number of Hashes returned from platform should be the same as the input",
);
final hashedAssets = <Asset>[];
final toBeAdded = <DeviceAsset>[];
for (final (index, hash) in hashes.indexed) {
final asset = toBeHashed.elementAtOrNull(index)?.asset;
if (asset != null && hash?.length == 20) {
hashedAssets.add(asset.copyWith(checksum: base64.encode(hash!)));
toBeAdded.add(
DeviceAsset(
assetId: asset.localId!,
hash: hash,
modifiedTime: asset.fileModifiedAt,
),
);
} else {
_log.warning("Failed to hash file ${toHash[j]}, skipping");
anyNull = true;
_log.warning("Failed to hash file ${asset?.localId ?? '<null>'}");
if (asset != null) {
toBeDeleted.add(asset.localId!);
}
}
}
final validHashes = anyNull
? toAdd.where((e) => e.hash.length == 20).toList(growable: false)
: toAdd;
await _assetRepository
.transaction(() => _assetRepository.upsertDeviceAssets(validHashes));
_log.fine("Hashed ${validHashes.length}/${toHash.length} assets");
// Update the DB for future retrieval
await _deviceAssetRepository.transaction(() async {
await _deviceAssetRepository.updateAll(toBeAdded);
await _deviceAssetRepository.deleteIds(toBeDeleted);
});
_log.fine("Hashed ${hashedAssets.length}/${toBeHashed.length} assets");
return hashedAssets;
}
/// Hashes the given files and returns a list of the same length
/// files that could not be hashed have a `null` value
/// Hashes the given files and returns a list of the same length.
/// Files that could not be hashed will have a `null` value
Future<List<Uint8List?>> _hashFiles(List<String> paths) async {
final List<Uint8List?>? hashes =
await _backgroundService.digestFiles(paths);
if (hashes == null) {
throw Exception("Hashing ${paths.length} files failed");
}
return hashes;
}
/// Returns all successfully hashed [Asset]s with their hash value set
List<Asset> _getHashedAssets(
List<Asset> assets,
List<DeviceAsset?> hashes,
) {
final List<Asset> result = [];
for (int i = 0; i < assets.length; i++) {
if (hashes[i] != null && hashes[i]!.hash.isNotEmpty) {
assets[i].byteHash = hashes[i]!.hash;
result.add(assets[i]);
try {
final hashes = await _backgroundService.digestFiles(paths);
if (hashes != null) {
return hashes;
}
_log.severe("Hashing ${paths.length} files failed");
} catch (e, s) {
_log.severe("Error occurred while hashing assets", e, s);
}
return result;
return List.filled(paths.length, null);
}
}
class _AssetPath {
final Asset asset;
final String path;
const _AssetPath({required this.asset, required this.path});
_AssetPath copyWith({Asset? asset, String? path}) {
return _AssetPath(asset: asset ?? this.asset, path: path ?? this.path);
}
}
final hashServiceProvider = Provider(
(ref) => HashService(
ref.watch(assetRepositoryProvider),
ref.watch(backgroundServiceProvider),
ref.watch(albumMediaRepositoryProvider),
deviceAssetRepository: ref.watch(deviceAssetRepositoryProvider),
backgroundService: ref.watch(backgroundServiceProvider),
),
);
+42 -8
View File
@@ -577,15 +577,18 @@ class SyncService {
Set<String>? excludedAssets,
bool forceRefresh = false,
]) async {
_log.info("Syncing a local album to DB: ${deviceAlbum.name}");
if (!forceRefresh && !await _hasAlbumChangeOnDevice(deviceAlbum, dbAlbum)) {
_log.fine(
_log.info(
"Local album ${deviceAlbum.name} has not changed. Skipping sync.",
);
return false;
}
_log.info("Local album ${deviceAlbum.name} has changed. Syncing...");
if (!forceRefresh &&
excludedAssets == null &&
await _syncDeviceAlbumFast(deviceAlbum, dbAlbum)) {
_log.info("Fast synced local album ${deviceAlbum.name} to DB");
return true;
}
// general case, e.g. some assets have been deleted or there are excluded albums on iOS
@@ -598,7 +601,7 @@ class SyncService {
assert(inDb.isSorted(Asset.compareByChecksum), "inDb not sorted!");
final int assetCountOnDevice =
await _albumMediaRepository.getAssetCount(deviceAlbum.localId!);
final List<Asset> onDevice = await _hashService.getHashedAssets(
final List<Asset> onDevice = await _getHashedAssets(
deviceAlbum,
excludedAssets: excludedAssets,
);
@@ -611,7 +614,7 @@ class SyncService {
dbAlbum.name == deviceAlbum.name &&
dbAlbum.modifiedAt.isAtSameMomentAs(deviceAlbum.modifiedAt)) {
// changes only affeted excluded albums
_log.fine(
_log.info(
"Only excluded assets in local album ${deviceAlbum.name} changed. Stopping sync.",
);
if (assetCountOnDevice !=
@@ -626,11 +629,11 @@ class SyncService {
}
return false;
}
_log.fine(
_log.info(
"Syncing local album ${deviceAlbum.name}. ${toAdd.length} assets to add, ${toUpdate.length} to update, ${toDelete.length} to delete",
);
final (existingInDb, updated) = await _linkWithExistingFromDb(toAdd);
_log.fine(
_log.info(
"Linking assets to add with existing from db. ${existingInDb.length} existing, ${updated.length} to update",
);
deleteCandidates.addAll(toDelete);
@@ -667,6 +670,9 @@ class SyncService {
/// returns `true` if successful, else `false`
Future<bool> _syncDeviceAlbumFast(Album deviceAlbum, Album dbAlbum) async {
if (!deviceAlbum.modifiedAt.isAfter(dbAlbum.modifiedAt)) {
_log.info(
"Local album ${deviceAlbum.name} has not changed. Skipping sync.",
);
return false;
}
final int totalOnDevice =
@@ -676,15 +682,21 @@ class SyncService {
?.assetCount ??
0;
if (totalOnDevice <= lastKnownTotal) {
_log.info(
"Local album ${deviceAlbum.name} totalOnDevice is less than lastKnownTotal. Skipping sync.",
);
return false;
}
final List<Asset> newAssets = await _hashService.getHashedAssets(
final List<Asset> newAssets = await _getHashedAssets(
deviceAlbum,
modifiedFrom: dbAlbum.modifiedAt.add(const Duration(seconds: 1)),
modifiedUntil: deviceAlbum.modifiedAt,
);
if (totalOnDevice != lastKnownTotal + newAssets.length) {
_log.info(
"Local album ${deviceAlbum.name} totalOnDevice is not equal to lastKnownTotal + newAssets.length. Skipping sync.",
);
return false;
}
dbAlbum.modifiedAt = deviceAlbum.modifiedAt;
@@ -719,8 +731,8 @@ class SyncService {
List<Asset> existing, [
Set<String>? excludedAssets,
]) async {
_log.info("Syncing a new local album to DB: ${album.name}");
final assets = await _hashService.getHashedAssets(
_log.info("Adding a new local album to DB: ${album.name}");
final assets = await _getHashedAssets(
album,
excludedAssets: excludedAssets,
);
@@ -824,6 +836,28 @@ class SyncService {
}
}
/// Returns all assets that were successfully hashed
Future<List<Asset>> _getHashedAssets(
Album album, {
int start = 0,
int end = 0x7fffffffffffffff,
DateTime? modifiedFrom,
DateTime? modifiedUntil,
Set<String>? excludedAssets,
}) async {
final entities = await _albumMediaRepository.getAssets(
album.localId!,
start: start,
end: end,
modifiedFrom: modifiedFrom,
modifiedUntil: modifiedUntil,
);
final filtered = excludedAssets == null
? entities
: entities.where((e) => !excludedAssets.contains(e.localId!)).toList();
return _hashService.hashAssets(filtered);
}
List<Asset> _removeDuplicates(List<Asset> assets) {
final int before = assets.length;
assets.sort(Asset.compareByOwnerChecksumCreatedModified);