Compare commits
1 Commits
docs-datab
...
test-concu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2793086c03 |
@@ -169,7 +169,7 @@ class BackgroundSyncManager {
|
|||||||
return _linkedAlbumSyncTask!.future;
|
return _linkedAlbumSyncTask!.future;
|
||||||
}
|
}
|
||||||
|
|
||||||
_linkedAlbumSyncTask = runInIsolateGentle(computation: syncLinkedAlbumsIsolated);
|
_linkedAlbumSyncTask = runInIsolateGentle(computation: syncLinkedAlbumsIsolated, debugLabel: "LinkedAlbumSync");
|
||||||
return _linkedAlbumSyncTask!.whenComplete(() {
|
return _linkedAlbumSyncTask!.whenComplete(() {
|
||||||
_linkedAlbumSyncTask = null;
|
_linkedAlbumSyncTask = null;
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ import 'package:immich_mobile/infrastructure/entities/store.entity.dart';
|
|||||||
import 'package:immich_mobile/infrastructure/entities/user.entity.dart';
|
import 'package:immich_mobile/infrastructure/entities/user.entity.dart';
|
||||||
import 'package:immich_mobile/infrastructure/entities/user_metadata.entity.dart';
|
import 'package:immich_mobile/infrastructure/entities/user_metadata.entity.dart';
|
||||||
import 'package:immich_mobile/infrastructure/repositories/db.repository.steps.dart';
|
import 'package:immich_mobile/infrastructure/repositories/db.repository.steps.dart';
|
||||||
|
import 'package:immich_mobile/infrastructure/repositories/drirft_native.dart';
|
||||||
import 'package:isar/isar.dart' hide Index;
|
import 'package:isar/isar.dart' hide Index;
|
||||||
|
|
||||||
import 'db.repository.drift.dart';
|
import 'db.repository.drift.dart';
|
||||||
@@ -67,7 +68,7 @@ class IsarDatabaseRepository implements IDatabaseRepository {
|
|||||||
)
|
)
|
||||||
class Drift extends $Drift implements IDatabaseRepository {
|
class Drift extends $Drift implements IDatabaseRepository {
|
||||||
Drift([QueryExecutor? executor])
|
Drift([QueryExecutor? executor])
|
||||||
: super(executor ?? driftDatabase(name: 'immich', native: const DriftNativeOptions(shareAcrossIsolates: true)));
|
: super(executor ?? driftDatabaseTest(name: 'immich', native: const DriftNativeOptions(shareAcrossIsolates: true)));
|
||||||
|
|
||||||
@override
|
@override
|
||||||
int get schemaVersion => 10;
|
int get schemaVersion => 10;
|
||||||
|
|||||||
151
mobile/lib/infrastructure/repositories/drirft_native.dart
Normal file
151
mobile/lib/infrastructure/repositories/drirft_native.dart
Normal file
@@ -0,0 +1,151 @@
|
|||||||
|
import 'dart:async';
|
||||||
|
import 'dart:io';
|
||||||
|
import 'dart:isolate';
|
||||||
|
import 'dart:ui';
|
||||||
|
|
||||||
|
import 'package:drift/drift.dart';
|
||||||
|
import 'package:drift/isolate.dart';
|
||||||
|
import 'package:drift/native.dart';
|
||||||
|
import 'package:drift_flutter/drift_flutter.dart';
|
||||||
|
import 'package:meta/meta.dart';
|
||||||
|
import 'package:path_provider/path_provider.dart';
|
||||||
|
import 'package:path/path.dart' as p;
|
||||||
|
import 'package:sqlite3/sqlite3.dart';
|
||||||
|
import 'package:sqlite3_flutter_libs/sqlite3_flutter_libs.dart';
|
||||||
|
|
||||||
|
@internal
|
||||||
|
bool hasConfiguredSqlite = false;
|
||||||
|
|
||||||
|
String portName(String databaseName) {
|
||||||
|
return 'drift-db/$databaseName';
|
||||||
|
}
|
||||||
|
|
||||||
|
String isolateControlPortName(String databaseName) {
|
||||||
|
return 'drift-db/$databaseName/control';
|
||||||
|
}
|
||||||
|
|
||||||
|
QueryExecutor driftDatabaseTest({required String name, DriftWebOptions? web, DriftNativeOptions? native}) {
|
||||||
|
Future<File> databaseFile() async {
|
||||||
|
if (native?.databasePath case final lookupPath?) {
|
||||||
|
return File(await lookupPath());
|
||||||
|
} else {
|
||||||
|
final resolvedDirectory = await (native?.databaseDirectory ?? getApplicationDocumentsDirectory)();
|
||||||
|
|
||||||
|
return File(
|
||||||
|
p.join(switch (resolvedDirectory) {
|
||||||
|
Directory(:final path) => path,
|
||||||
|
final String path => path,
|
||||||
|
final other => throw ArgumentError.value(
|
||||||
|
other,
|
||||||
|
'other',
|
||||||
|
'databaseDirectory on DriftNativeOptions must resolve to a '
|
||||||
|
'directory or a path as string.',
|
||||||
|
),
|
||||||
|
}, '$name.sqlite'),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return DatabaseConnection.delayed(
|
||||||
|
Future(() async {
|
||||||
|
if (!hasConfiguredSqlite) {
|
||||||
|
// Also work around limitations on old Android versions
|
||||||
|
if (Platform.isAndroid) {
|
||||||
|
await applyWorkaroundToOpenSqlite3OnOldAndroidVersions();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sqlite3 pick a more suitable location for temporary files - the
|
||||||
|
// one from the system may be inaccessible due to sandboxing.
|
||||||
|
final cachebase = await (native?.tempDirectoryPath?.call() ?? getTemporaryDirectory().then((d) => d.path));
|
||||||
|
|
||||||
|
if (cachebase != null) {
|
||||||
|
// We can't access /tmp on Android, which sqlite3 would try by default.
|
||||||
|
// Explicitly tell it about the correct temporary directory.
|
||||||
|
sqlite3.tempDirectory = cachebase;
|
||||||
|
}
|
||||||
|
|
||||||
|
hasConfiguredSqlite = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (native != null && native.shareAcrossIsolates) {
|
||||||
|
const connectTimeout = Duration(seconds: 1);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
if (IsolateNameServer.lookupPortByName(portName(name)) case final port?) {
|
||||||
|
final isolate = DriftIsolate.fromConnectPort(port);
|
||||||
|
try {
|
||||||
|
return await isolate.connect(connectTimeout: connectTimeout, isolateDebugLog: true);
|
||||||
|
} on TimeoutException {
|
||||||
|
// Isolate has stopped shortly after the register call. It should
|
||||||
|
// also remove the port mapping, so we can just try again in another
|
||||||
|
// iteration.
|
||||||
|
// However, it's possible for the isolate to become unreachable
|
||||||
|
// without unregistering itself (either due to a fatal error or when
|
||||||
|
// doing a hot restart). Check if the isolate is still reachable,
|
||||||
|
// and remove the mapping if it's not.
|
||||||
|
final controlPort = IsolateNameServer.lookupPortByName(isolateControlPortName(name));
|
||||||
|
if (controlPort == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
final supposedIsolate = Isolate(controlPort);
|
||||||
|
if (!await supposedIsolate.pingWithTimeout()) {
|
||||||
|
// Yup, gone!
|
||||||
|
IsolateNameServer.removePortNameMapping(portName(name));
|
||||||
|
}
|
||||||
|
// Otherwise, the isolate is probably paused. Keep trying...
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// No port has been registered yet! Spawn an isolate that will try to
|
||||||
|
// register itself as the database server.
|
||||||
|
final receiveFromPending = ReceivePort();
|
||||||
|
final firstMessage = receiveFromPending.first;
|
||||||
|
await Isolate.spawn(_isolateEntrypoint, (
|
||||||
|
name: name,
|
||||||
|
options: native,
|
||||||
|
sendResponses: receiveFromPending.sendPort,
|
||||||
|
path: (await databaseFile()).path,
|
||||||
|
), onExit: receiveFromPending.sendPort);
|
||||||
|
|
||||||
|
// The isolate will either succeed in registering its connect port to
|
||||||
|
// the name server (in which case it sends us the port), or it fails
|
||||||
|
// due to a race condition (in which case it exits).
|
||||||
|
final first = await firstMessage;
|
||||||
|
if (first case SendPort port) {
|
||||||
|
return await DriftIsolate.fromConnectPort(port).connect(isolateDebugLog: true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NativeDatabase.createBackgroundConnection(await databaseFile());
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef _EntrypointMessage = ({String name, String path, DriftNativeOptions options, SendPort sendResponses});
|
||||||
|
|
||||||
|
void _isolateEntrypoint(_EntrypointMessage message) {
|
||||||
|
final connections = ReceivePort();
|
||||||
|
if (IsolateNameServer.registerPortWithName(connections.sendPort, portName(message.name))) {
|
||||||
|
final controlPortName = isolateControlPortName(message.name);
|
||||||
|
final server = DriftIsolate.inCurrent(
|
||||||
|
() => NativeDatabase(File(message.path)),
|
||||||
|
port: connections,
|
||||||
|
beforeShutdown: () {
|
||||||
|
IsolateNameServer.removePortNameMapping(portName(message.name));
|
||||||
|
IsolateNameServer.removePortNameMapping(controlPortName);
|
||||||
|
},
|
||||||
|
killIsolateWhenDone: true,
|
||||||
|
shutdownAfterLastDisconnect: true,
|
||||||
|
);
|
||||||
|
|
||||||
|
message.sendResponses.send(server.connectPort);
|
||||||
|
|
||||||
|
IsolateNameServer.removePortNameMapping(controlPortName);
|
||||||
|
IsolateNameServer.registerPortWithName(Isolate.current.controlPort, controlPortName);
|
||||||
|
} else {
|
||||||
|
// Another isolate is responsible for hosting this database, abort.
|
||||||
|
connections.close();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -2,6 +2,7 @@ import 'package:drift/drift.dart';
|
|||||||
import 'package:drift_flutter/drift_flutter.dart';
|
import 'package:drift_flutter/drift_flutter.dart';
|
||||||
import 'package:immich_mobile/domain/interfaces/db.interface.dart';
|
import 'package:immich_mobile/domain/interfaces/db.interface.dart';
|
||||||
import 'package:immich_mobile/infrastructure/entities/log.entity.dart';
|
import 'package:immich_mobile/infrastructure/entities/log.entity.dart';
|
||||||
|
import 'package:immich_mobile/infrastructure/repositories/drirft_native.dart';
|
||||||
|
|
||||||
import 'logger_db.repository.drift.dart';
|
import 'logger_db.repository.drift.dart';
|
||||||
|
|
||||||
@@ -9,7 +10,7 @@ import 'logger_db.repository.drift.dart';
|
|||||||
class DriftLogger extends $DriftLogger implements IDatabaseRepository {
|
class DriftLogger extends $DriftLogger implements IDatabaseRepository {
|
||||||
DriftLogger([QueryExecutor? executor])
|
DriftLogger([QueryExecutor? executor])
|
||||||
: super(
|
: super(
|
||||||
executor ?? driftDatabase(name: 'immich_logs', native: const DriftNativeOptions(shareAcrossIsolates: true)),
|
executor ?? driftDatabaseTest(name: 'immich_logs', native: const DriftNativeOptions(shareAcrossIsolates: true)),
|
||||||
);
|
);
|
||||||
|
|
||||||
@override
|
@override
|
||||||
|
|||||||
@@ -78,9 +78,13 @@ class DriftAuthUserRepository extends DriftDatabaseRepository {
|
|||||||
if (user == null) return null;
|
if (user == null) return null;
|
||||||
|
|
||||||
final query = _db.userMetadataEntity.select()..where((e) => e.userId.equals(id));
|
final query = _db.userMetadataEntity.select()..where((e) => e.userId.equals(id));
|
||||||
|
|
||||||
|
print("getting metadata for user $id");
|
||||||
final metadata = await query.map((row) => row.toDto()).get();
|
final metadata = await query.map((row) => row.toDto()).get();
|
||||||
|
|
||||||
return user.toDto(metadata);
|
final a = user.toDto(metadata);
|
||||||
|
print("get user $id metadata $a");
|
||||||
|
return a;
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<UserDto> upsert(UserDto user) async {
|
Future<UserDto> upsert(UserDto user) async {
|
||||||
|
|||||||
@@ -31,55 +31,71 @@ Cancelable<T?> runInIsolateGentle<T>({
|
|||||||
}
|
}
|
||||||
|
|
||||||
return workerManager.executeGentle((cancelledChecker) async {
|
return workerManager.executeGentle((cancelledChecker) async {
|
||||||
BackgroundIsolateBinaryMessenger.ensureInitialized(token);
|
await runZonedGuarded(
|
||||||
DartPluginRegistrant.ensureInitialized();
|
() async {
|
||||||
|
BackgroundIsolateBinaryMessenger.ensureInitialized(token);
|
||||||
|
DartPluginRegistrant.ensureInitialized();
|
||||||
|
|
||||||
final (isar, drift, logDb) = await Bootstrap.initDB();
|
final (isar, drift, logDb) = await Bootstrap.initDB();
|
||||||
await Bootstrap.initDomain(isar, drift, logDb, shouldBufferLogs: false);
|
await Bootstrap.initDomain(isar, drift, logDb, shouldBufferLogs: false);
|
||||||
final ref = ProviderContainer(
|
final ref = ProviderContainer(
|
||||||
overrides: [
|
overrides: [
|
||||||
// TODO: Remove once isar is removed
|
// TODO: Remove once isar is removed
|
||||||
dbProvider.overrideWithValue(isar),
|
dbProvider.overrideWithValue(isar),
|
||||||
isarProvider.overrideWithValue(isar),
|
isarProvider.overrideWithValue(isar),
|
||||||
cancellationProvider.overrideWithValue(cancelledChecker),
|
cancellationProvider.overrideWithValue(cancelledChecker),
|
||||||
driftProvider.overrideWith(driftOverride(drift)),
|
driftProvider.overrideWith(driftOverride(drift)),
|
||||||
],
|
],
|
||||||
);
|
);
|
||||||
|
|
||||||
Logger log = Logger("IsolateLogger");
|
Logger log = Logger("IsolateLogger");
|
||||||
|
|
||||||
try {
|
|
||||||
HttpSSLOptions.apply(applyNative: false);
|
|
||||||
return await computation(ref);
|
|
||||||
} on CanceledError {
|
|
||||||
log.warning("Computation cancelled ${debugLabel == null ? '' : ' for $debugLabel'}");
|
|
||||||
} catch (error, stack) {
|
|
||||||
log.severe("Error in runInIsolateGentle ${debugLabel == null ? '' : ' for $debugLabel'}", error, stack);
|
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
await LogService.I.dispose();
|
|
||||||
await logDb.close();
|
|
||||||
await ref.read(driftProvider).close();
|
|
||||||
|
|
||||||
// Close Isar safely
|
|
||||||
try {
|
try {
|
||||||
final isar = ref.read(isarProvider);
|
HttpSSLOptions.apply(applyNative: false);
|
||||||
if (isar.isOpen) {
|
return await computation(ref);
|
||||||
await isar.close();
|
} on CanceledError {
|
||||||
}
|
log.warning("Computation cancelled ${debugLabel == null ? '' : ' for $debugLabel'}");
|
||||||
} catch (e) {
|
} catch (error, stack) {
|
||||||
debugPrint("Error closing Isar: $e");
|
log.severe("Error in runInIsolateGentle ${debugLabel == null ? '' : ' for $debugLabel'}", error, stack);
|
||||||
}
|
} finally {
|
||||||
|
try {
|
||||||
|
print("1 close logs service");
|
||||||
|
await LogService.I.dispose();
|
||||||
|
|
||||||
ref.dispose();
|
print("2 close logs db");
|
||||||
} catch (error, stack) {
|
await logDb.close();
|
||||||
debugPrint("Error closing resources in isolate: $error, $stack");
|
|
||||||
} finally {
|
print("3 close drift $debugLabel");
|
||||||
ref.dispose();
|
await ref.read(driftProvider).close();
|
||||||
// Delay to ensure all resources are released
|
|
||||||
await Future.delayed(const Duration(seconds: 2));
|
// Close Isar safely
|
||||||
}
|
try {
|
||||||
}
|
print("4 close isar");
|
||||||
return null;
|
final isar = ref.read(isarProvider);
|
||||||
|
if (isar.isOpen) {
|
||||||
|
await isar.close();
|
||||||
|
}
|
||||||
|
print("5 closed isar");
|
||||||
|
} catch (e) {
|
||||||
|
debugPrint("Error closing Isar: $e");
|
||||||
|
}
|
||||||
|
|
||||||
|
print("6 dispose ref");
|
||||||
|
} catch (error, stack) {
|
||||||
|
debugPrint("Error closing resources in isolate: $error, $stack");
|
||||||
|
} finally {
|
||||||
|
print("finished isolate ${debugLabel == null ? '' : ' for $debugLabel'}");
|
||||||
|
ref.dispose();
|
||||||
|
|
||||||
|
// Delay to ensure all resources are released
|
||||||
|
await Future.delayed(const Duration(seconds: 2));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
},
|
||||||
|
(error, stackTrace) {
|
||||||
|
print("Run zoned error in isolate ${debugLabel == null ? '' : ' for $debugLabel'}: $error, $stackTrace");
|
||||||
|
},
|
||||||
|
);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user