Skip to content

WIP: Manual schema management API #290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: raw-tables
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';
import 'package:meta/meta.dart';

Expand Down Expand Up @@ -44,6 +45,9 @@ class PowerSyncDatabaseImpl
@override
SqliteDatabase database;

@override
bool manualSchemaManagement;

@override
@protected
late Future<void> isInitialized;
Expand Down Expand Up @@ -76,15 +80,21 @@ class PowerSyncDatabaseImpl
required String path,
int maxReaders = SqliteDatabase.defaultMaxReaders,
Logger? logger,
bool manualSchemaManagement = false,
@Deprecated("Use [PowerSyncDatabase.withFactory] instead.")
// ignore: deprecated_member_use_from_same_package
SqliteConnectionSetup? sqliteSetup}) {
// ignore: deprecated_member_use_from_same_package
DefaultSqliteOpenFactory factory =
// ignore: deprecated_member_use_from_same_package
PowerSyncOpenFactory(path: path, sqliteSetup: sqliteSetup);
return PowerSyncDatabaseImpl.withFactory(factory,
schema: schema, maxReaders: maxReaders, logger: logger);
return PowerSyncDatabaseImpl.withFactory(
factory,
schema: schema,
maxReaders: maxReaders,
logger: logger,
manualSchemaManagement: manualSchemaManagement,
);
}

/// Open a [PowerSyncDatabase] with a [PowerSyncOpenFactory].
Expand All @@ -96,22 +106,32 @@ class PowerSyncDatabaseImpl
///
/// [logger] defaults to [autoLogger], which logs to the console in debug builds.
factory PowerSyncDatabaseImpl.withFactory(
DefaultSqliteOpenFactory openFactory,
{required Schema schema,
int maxReaders = SqliteDatabase.defaultMaxReaders,
Logger? logger}) {
DefaultSqliteOpenFactory openFactory, {
required Schema schema,
int maxReaders = SqliteDatabase.defaultMaxReaders,
Logger? logger,
bool manualSchemaManagement = false,
}) {
final db = SqliteDatabase.withFactory(openFactory, maxReaders: maxReaders);
return PowerSyncDatabaseImpl.withDatabase(
schema: schema, database: db, logger: logger);
schema: schema,
database: db,
logger: logger,
manualSchemaManagement: manualSchemaManagement,
);
}

/// Open a PowerSyncDatabase on an existing [SqliteDatabase].
///
/// Migrations are run on the database when this constructor is called.
///
/// [logger] defaults to [autoLogger], which logs to the console in debug builds.s
PowerSyncDatabaseImpl.withDatabase(
{required this.schema, required this.database, Logger? logger}) {
PowerSyncDatabaseImpl.withDatabase({
required this.schema,
required this.database,
Logger? logger,
this.manualSchemaManagement = false,
}) {
this.logger = logger ?? autoLogger;
isInitialized = baseInit();
}
Expand Down Expand Up @@ -247,6 +267,7 @@ class PowerSyncDatabaseImpl
options,
crudMutex.shared,
syncMutex.shared,
jsonEncode(schema),
),
debugName: 'Sync ${database.openFactory.path}',
onError: receiveUnhandledErrors.sendPort,
Expand Down Expand Up @@ -290,13 +311,15 @@ class _PowerSyncDatabaseIsolateArgs {
final ResolvedSyncOptions options;
final SerializedMutex crudMutex;
final SerializedMutex syncMutex;
final String schemaJson;

_PowerSyncDatabaseIsolateArgs(
this.sPort,
this.dbRef,
this.options,
this.crudMutex,
this.syncMutex,
this.schemaJson,
);
}

Expand Down Expand Up @@ -392,6 +415,7 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
final storage = BucketStorage(connection);
final sync = StreamingSyncImplementation(
adapter: storage,
schemaJson: args.schemaJson,
connector: InternalConnector(
getCredentialsCached: getCredentialsCached,
prefetchCredentials: prefetchCredentials,
Expand Down
64 changes: 41 additions & 23 deletions packages/powersync_core/lib/src/database/powersync_database.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,23 @@ abstract class PowerSyncDatabase
/// A maximum of [maxReaders] concurrent read transactions are allowed.
///
/// [logger] defaults to [autoLogger], which logs to the console in debug builds.
factory PowerSyncDatabase(
{required Schema schema,
required String path,
Logger? logger,
@Deprecated("Use [PowerSyncDatabase.withFactory] instead.")
// ignore: deprecated_member_use_from_same_package
SqliteConnectionSetup? sqliteSetup}) {
factory PowerSyncDatabase({
required Schema schema,
required String path,
Logger? logger,
bool manualSchemaManagement = false,
@Deprecated("Use [PowerSyncDatabase.withFactory] instead.")
// ignore: deprecated_member_use_from_same_package
SqliteConnectionSetup? sqliteSetup,
}) {
return PowerSyncDatabaseImpl(
schema: schema,
path: path,
logger: logger,
// ignore: deprecated_member_use_from_same_package
sqliteSetup: sqliteSetup);
schema: schema,
path: path,
manualSchemaManagement: manualSchemaManagement,
logger: logger,
// ignore: deprecated_member_use_from_same_package
sqliteSetup: sqliteSetup,
);
}

/// Open a [PowerSyncDatabase] with a [PowerSyncOpenFactory].
Expand All @@ -55,24 +59,38 @@ abstract class PowerSyncDatabase
/// Subclass [PowerSyncOpenFactory] to add custom logic to this process.
///
/// [logger] defaults to [autoLogger], which logs to the console in debug builds.
factory PowerSyncDatabase.withFactory(DefaultSqliteOpenFactory openFactory,
{required Schema schema,
int maxReaders = SqliteDatabase.defaultMaxReaders,
Logger? logger}) {
return PowerSyncDatabaseImpl.withFactory(openFactory,
schema: schema, maxReaders: maxReaders, logger: logger);
factory PowerSyncDatabase.withFactory(
DefaultSqliteOpenFactory openFactory, {
required Schema schema,
int maxReaders = SqliteDatabase.defaultMaxReaders,
bool manualSchemaManagement = false,
Logger? logger,
}) {
return PowerSyncDatabaseImpl.withFactory(
openFactory,
schema: schema,
maxReaders: maxReaders,
manualSchemaManagement: manualSchemaManagement,
logger: logger,
);
}

/// Open a PowerSyncDatabase on an existing [SqliteDatabase].
///
/// Migrations are run on the database when this constructor is called.
///
/// [logger] defaults to [autoLogger], which logs to the console in debug builds.
factory PowerSyncDatabase.withDatabase(
{required Schema schema,
required SqliteDatabase database,
Logger? loggers}) {
factory PowerSyncDatabase.withDatabase({
required Schema schema,
required SqliteDatabase database,
bool manualSchemaManagement = false,
Logger? logger,
}) {
return PowerSyncDatabaseImpl.withDatabase(
schema: schema, database: database, logger: loggers);
schema: schema,
database: database,
manualSchemaManagement: manualSchemaManagement,
logger: logger,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ class PowerSyncDatabaseImpl
@override
SqliteDatabase get database => throw UnimplementedError();

@override
bool get manualSchemaManagement => throw UnimplementedError();

@override
Future<void> get isInitialized => throw UnimplementedError();

Expand All @@ -53,6 +56,7 @@ class PowerSyncDatabaseImpl
{required Schema schema,
required String path,
int maxReaders = SqliteDatabase.defaultMaxReaders,
bool manualSchemaManagement = false,
Logger? logger,
@Deprecated("Use [PowerSyncDatabase.withFactory] instead.")
// ignore: deprecated_member_use_from_same_package
Expand All @@ -72,6 +76,7 @@ class PowerSyncDatabaseImpl
DefaultSqliteOpenFactory openFactory, {
required Schema schema,
int maxReaders = SqliteDatabase.defaultMaxReaders,
bool manualSchemaManagement = false,
Logger? logger,
}) {
throw UnimplementedError();
Expand All @@ -82,10 +87,12 @@ class PowerSyncDatabaseImpl
/// Migrations are run on the database when this constructor is called.
///
/// [logger] defaults to [autoLogger], which logs to the console in debug builds.s
factory PowerSyncDatabaseImpl.withDatabase(
{required Schema schema,
required SqliteDatabase database,
Logger? logger}) {
factory PowerSyncDatabaseImpl.withDatabase({
required Schema schema,
required SqliteDatabase database,
bool manualSchemaManagement = false,
Logger? logger,
}) {
throw UnimplementedError();
}

Expand Down
38 changes: 37 additions & 1 deletion packages/powersync_core/lib/src/database/powersync_db_mixin.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
/// Use [attachedLogger] to propagate logs to [Logger.root] for custom logging.
Logger get logger;

bool get manualSchemaManagement;

bool _manualSchemaManagementCompleted = false;

@Deprecated("This field is unused, pass params to connect() instead")
Map<String, dynamic>? clientParams;

Expand Down Expand Up @@ -110,10 +114,36 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
statusStream = statusStreamController.stream;
updates = powerSyncUpdateNotifications(database.updates);

_manualSchemaManagementCompleted = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super happy with this design, but I don't see a better option either.

One thing that looks cleaner might be to have users open their own databases (from sqlite_async), initialize the schema there and then pass the database to PowerSyncDatabase.withDatabase. But that would make the database tricky to use with wrappers like drift because we can't swap out the earlier database with the other one later.
So this might be the best option we have here, but I'll ping @rkistner for API design advice as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue I can see with that is that the developer wouldn't be able to call powersyncDB.updateSchema() because they can only use the sqlite_async API at that point


await database.initialize();
await _checkVersion();
await database.execute('SELECT powersync_init()');
await updateSchema(schema);

if (!manualSchemaManagement) {
// Create the internal db schema
await updateSchema(schema);
await _afterSchemaReady();
}
}

Future<void> markSchemaAsReady() async {
await isInitialized;
_manualSchemaManagementCompleted = true;

await _afterSchemaReady();
}

void _checkSchemaIsReady() {
if (!manualSchemaManagement || _manualSchemaManagementCompleted) {
return;
}

throw StateError(
'In manual schema management mode, you need to mark the powersync database as ready');
}

Future<void> _afterSchemaReady() async {
await _updateHasSynced();
}

Expand Down Expand Up @@ -289,6 +319,8 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
// the lock for the connection.
await initialize();

_checkSchemaIsReady();

final resolvedOptions = ResolvedSyncOptions.resolve(
options,
crudThrottleTime: crudThrottleTime,
Expand Down Expand Up @@ -452,13 +484,15 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
/// Get an unique id for this client.
/// This id is only reset when the database is deleted.
Future<String> getClientId() async {
_checkSchemaIsReady(); // TODO(skilldevs): Needed?
final row = await get('SELECT powersync_client_id() as client_id');
return row['client_id'] as String;
}

/// Get upload queue size estimate and count.
Future<UploadQueueStats> getUploadQueueStats(
{bool includeSize = false}) async {
_checkSchemaIsReady();
if (includeSize) {
final row = await getOptional(
'SELECT SUM(cast(data as blob) + 20) as size, count(*) as count FROM ps_crud');
Expand Down Expand Up @@ -486,6 +520,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
/// data by transaction. One batch may contain data from multiple transactions,
/// and a single transaction may be split over multiple batches.
Future<CrudBatch?> getCrudBatch({int limit = 100}) async {
_checkSchemaIsReady();
final rows = await getAll(
'SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?',
[limit + 1]);
Expand Down Expand Up @@ -532,6 +567,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
/// Unlike [getCrudBatch], this only returns data from a single transaction at a time.
/// All data for the transaction is loaded into memory.
Future<CrudTransaction?> getNextCrudTransaction() async {
_checkSchemaIsReady();
return await readTransaction((tx) async {
final first = await tx.getOptional(
'SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1');
Expand Down
Loading