Skip to content

Refactor sync client and options #281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions demos/benchmarks/lib/powersync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,15 @@ Future<String> getDatabasePath() async {

var currentConnector = BackendConnector();

const options = SyncOptions(
params: {'size_bucket': AppConfig.sizeBucket},
crudThrottleTime: Duration(milliseconds: 1),
);

Future<void> resync() async {
await db.disconnectAndClear();
timer.start(db);
db.connect(
connector: currentConnector,
params: {'size_bucket': AppConfig.sizeBucket},
crudThrottleTime: const Duration(milliseconds: 1));
db.connect(connector: currentConnector, options: options);
}

Future<void> openDatabase() async {
Expand All @@ -106,8 +108,5 @@ Future<void> openDatabase() async {
BenchmarkItem.updateItemBenchmarks();

timer.start(db);
db.connect(
connector: currentConnector,
params: {'size_bucket': AppConfig.sizeBucket},
crudThrottleTime: const Duration(milliseconds: 1));
db.connect(connector: currentConnector, options: options);
}
1 change: 1 addition & 0 deletions packages/powersync_core/lib/powersync_core.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export 'src/exceptions.dart';
export 'src/log.dart';
export 'src/open_factory.dart';
export 'src/schema.dart';
export 'src/sync/options.dart' hide ResolvedSyncOptions;
export 'src/sync/sync_status.dart'
hide BucketProgress, InternalSyncDownloadProgress;
export 'src/uuid.dart';
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import 'package:powersync_core/src/log_internal.dart';
import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart';
import 'package:powersync_core/src/open_factory/native/native_open_factory.dart';
import 'package:powersync_core/src/schema.dart';
import 'package:powersync_core/src/sync/internal_connector.dart';
import 'package:powersync_core/src/sync/options.dart';
import 'package:powersync_core/src/sync/streaming_sync.dart';
import 'package:powersync_core/src/sync/sync_status.dart';
import 'package:sqlite_async/sqlite3_common.dart';
Expand Down Expand Up @@ -118,10 +120,9 @@ class PowerSyncDatabaseImpl
@internal
Future<void> connectInternal({
required PowerSyncBackendConnector connector,
required Duration crudThrottleTime,
required SyncOptions options,
required AbortController abort,
required Zone asyncWorkZone,
Map<String, dynamic>? params,
}) async {
final dbRef = database.isolateConnectionFactory();

Expand All @@ -134,6 +135,7 @@ class PowerSyncDatabaseImpl
SendPort? initPort;
final hasInitPort = Completer<void>();
final receivedIsolateExit = Completer<void>();
final resolved = ResolvedSyncOptions(options);

Future<void> waitForShutdown() async {
// Only complete the abortion signal after the isolate shuts down. This
Expand Down Expand Up @@ -161,22 +163,27 @@ class PowerSyncDatabaseImpl
Future<void> handleMessage(Object? data) async {
if (data is List) {
String action = data[0] as String;
if (action == "getCredentials") {
if (action == "getCredentialsCached") {
await (data[1] as PortCompleter).handle(() async {
final token = await connector.getCredentialsCached();
logger.fine('Credentials: $token');
return token;
});
} else if (action == "invalidateCredentials") {
} else if (action == "prefetchCredentials") {
logger.fine('Refreshing credentials');
final invalidate = data[2] as bool;

await (data[1] as PortCompleter).handle(() async {
await connector.prefetchCredentials();
if (invalidate) {
connector.invalidateCredentials();
}
return await connector.prefetchCredentials();
});
} else if (action == 'init') {
final port = initPort = data[1] as SendPort;
hasInitPort.complete();
var crudStream =
database.onChange(['ps_crud'], throttle: crudThrottleTime);
var crudStream = database
.onChange(['ps_crud'], throttle: resolved.crudThrottleTime);
crudUpdateSubscription = crudStream.listen((event) {
port.send(['update']);
});
Expand Down Expand Up @@ -238,8 +245,7 @@ class PowerSyncDatabaseImpl
_PowerSyncDatabaseIsolateArgs(
receiveMessages.sendPort,
dbRef,
retryDelay,
clientParams,
resolved,
crudMutex.shared,
syncMutex.shared,
),
Expand Down Expand Up @@ -282,16 +288,14 @@ class PowerSyncDatabaseImpl
class _PowerSyncDatabaseIsolateArgs {
final SendPort sPort;
final IsolateConnectionFactory dbRef;
final Duration retryDelay;
final Map<String, dynamic>? parameters;
final ResolvedSyncOptions options;
final SerializedMutex crudMutex;
final SerializedMutex syncMutex;

_PowerSyncDatabaseIsolateArgs(
this.sPort,
this.dbRef,
this.retryDelay,
this.parameters,
this.options,
this.crudMutex,
this.syncMutex,
);
Expand Down Expand Up @@ -362,15 +366,16 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
sPort.send(['log', copy]);
});

Future<PowerSyncCredentials?> loadCredentials() async {
Future<PowerSyncCredentials?> getCredentialsCached() async {
final r = IsolateResult<PowerSyncCredentials?>();
sPort.send(['getCredentials', r.completer]);
sPort.send(['getCredentialsCached', r.completer]);
return r.future;
}

Future<void> invalidateCredentials() async {
final r = IsolateResult<void>();
sPort.send(['invalidateCredentials', r.completer]);
Future<PowerSyncCredentials?> prefetchCredentials(
{required bool invalidate}) async {
final r = IsolateResult<PowerSyncCredentials?>();
sPort.send(['prefetchCredentials', r.completer, invalidate]);
return r.future;
}

Expand All @@ -388,13 +393,14 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
final storage = BucketStorage(connection);
final sync = StreamingSyncImplementation(
adapter: storage,
credentialsCallback: loadCredentials,
invalidCredentialsCallback: invalidateCredentials,
uploadCrud: uploadCrud,
connector: InternalConnector(
getCredentialsCached: getCredentialsCached,
prefetchCredentials: prefetchCredentials,
uploadCrud: uploadCrud,
),
crudUpdateTriggerStream: crudUpdateController.stream,
retryDelay: args.retryDelay,
options: args.options,
client: http.Client(),
syncParameters: args.parameters,
crudMutex: crudMutex,
syncMutex: syncMutex,
);
Expand Down Expand Up @@ -429,6 +435,6 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
// This should be rare - any uncaught error is a bug. And in most cases,
// it should occur after the database is already open.
await shutdown();
throw error;
Error.throwWithStackTrace(error, stack);
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import 'package:powersync_core/sqlite_async.dart';
import 'package:powersync_core/src/abort_controller.dart';
import 'package:powersync_core/src/database/powersync_db_mixin.dart';
import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart';
import '../sync/options.dart';
import 'powersync_database.dart';

import '../connector.dart';
Expand Down Expand Up @@ -110,12 +111,12 @@ class PowerSyncDatabaseImpl

@override
@internal
Future<void> connectInternal(
{required PowerSyncBackendConnector connector,
required Duration crudThrottleTime,
required AbortController abort,
required Zone asyncWorkZone,
Map<String, dynamic>? params}) {
Future<void> connectInternal({
required PowerSyncBackendConnector connector,
required AbortController abort,
required Zone asyncWorkZone,
required SyncOptions options,
}) {
throw UnimplementedError();
}

Expand Down
28 changes: 20 additions & 8 deletions packages/powersync_core/lib/src/database/powersync_db_mixin.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import 'package:powersync_core/src/powersync_update_notification.dart';
import 'package:powersync_core/src/schema.dart';
import 'package:powersync_core/src/schema_logic.dart';
import 'package:powersync_core/src/schema_logic.dart' as schema_logic;
import 'package:powersync_core/src/sync/options.dart';
import 'package:powersync_core/src/sync/sync_status.dart';

mixin PowerSyncDatabaseMixin implements SqliteConnection {
Expand All @@ -37,6 +38,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
/// Use [attachedLogger] to propagate logs to [Logger.root] for custom logging.
Logger get logger;

@Deprecated("This field is unused, pass params to connect() instead")
Map<String, dynamic>? clientParams;

/// Current connection status.
Expand Down Expand Up @@ -72,6 +74,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
/// Delay between retrying failed requests.
/// Defaults to 5 seconds.
/// Only has an effect if changed before calling [connect].
@Deprecated('Set option when calling connect() instead')
Duration retryDelay = const Duration(seconds: 5);

@protected
Expand Down Expand Up @@ -269,17 +272,31 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
///
/// The connection is automatically re-opened if it fails for any reason.
///
/// To set sync parameters used in your sync rules (if any), use
/// [SyncOptions.params]. [SyncOptions] can also be used to tune the behavior
/// of the sync client, see that class for more information.
///
/// Status changes are reported on [statusStream].
Future<void> connect({
required PowerSyncBackendConnector connector,
Duration crudThrottleTime = const Duration(milliseconds: 10),
SyncOptions? options,
@Deprecated('Use SyncOptions.crudThrottleTime instead')
Duration? crudThrottleTime,
Map<String, dynamic>? params,
}) async {
// The initialization process acquires a sync connect lock (through
// updateSchema), so ensure the database is ready before we try to acquire
// the lock for the connection.
await initialize();

final resolvedOptions = SyncOptions(
crudThrottleTime: options?.crudThrottleTime ?? crudThrottleTime,
// ignore: deprecated_member_use_from_same_package
retryDelay: options?.retryDelay ?? retryDelay,
params: options?.params ?? params,
);

// ignore: deprecated_member_use_from_same_package
clientParams = params;
var thisConnectAborter = AbortController();
final zone = Zone.current;
Expand All @@ -294,8 +311,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {

await connectInternal(
connector: connector,
crudThrottleTime: crudThrottleTime,
params: params,
options: resolvedOptions,
abort: thisConnectAborter,
// Run follow-up async tasks in the parent zone, a new one is introduced
// while we hold the lock (and async tasks won't hold the sync lock).
Expand Down Expand Up @@ -342,17 +358,13 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
/// [connect] method and should not be called elsewhere.
/// This method will only be called internally when no other sync client is
/// active, so the method should not call [disconnect] itself.
///
/// The [crudThrottleTime] is the throttle time between CRUD operations, it
/// defaults to 10 milliseconds in [connect].
@protected
@internal
Future<void> connectInternal({
required PowerSyncBackendConnector connector,
required Duration crudThrottleTime,
required SyncOptions options,
required AbortController abort,
required Zone asyncWorkZone,
Map<String, dynamic>? params,
});

/// Close the sync connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import 'package:powersync_core/src/log.dart';
import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart';
import 'package:powersync_core/src/open_factory/web/web_open_factory.dart';
import 'package:powersync_core/src/schema.dart';
import 'package:powersync_core/src/sync/internal_connector.dart';
import 'package:powersync_core/src/sync/streaming_sync.dart';
import 'package:sqlite_async/sqlite_async.dart';

import '../../sync/options.dart';
import '../../web/sync_controller.dart';

/// A PowerSync managed database.
Expand Down Expand Up @@ -114,13 +116,11 @@ class PowerSyncDatabaseImpl
@internal
Future<void> connectInternal({
required PowerSyncBackendConnector connector,
required Duration crudThrottleTime,
required AbortController abort,
required Zone asyncWorkZone,
Map<String, dynamic>? params,
required SyncOptions options,
}) async {
final crudStream =
database.onChange(['ps_crud'], throttle: crudThrottleTime);
final resolved = ResolvedSyncOptions(options);

final storage = BucketStorage(database);
StreamingSync sync;
Expand All @@ -130,25 +130,23 @@ class PowerSyncDatabaseImpl
sync = await SyncWorkerHandle.start(
database: this,
connector: connector,
crudThrottleTimeMs: crudThrottleTime.inMilliseconds,
options: options,
workerUri: Uri.base.resolve('/powersync_sync.worker.js'),
syncParams: params,
);
} catch (e) {
logger.warning(
'Could not use shared worker for synchronization, falling back to locks.',
e,
);
final crudStream =
database.onChange(['ps_crud'], throttle: resolved.crudThrottleTime);

sync = StreamingSyncImplementation(
adapter: storage,
credentialsCallback: connector.getCredentialsCached,
invalidCredentialsCallback: connector.prefetchCredentials,
uploadCrud: () => connector.uploadData(this),
connector: InternalConnector.wrap(connector, this),
crudUpdateTriggerStream: crudStream,
retryDelay: Duration(seconds: 3),
options: resolved,
client: BrowserClient(),
syncParameters: params,
// Only allows 1 sync implementation to run at a time per database
// This should be global (across tabs) when using Navigator locks.
identifier: database.openFactory.path,
Expand Down
Loading