Skip to content

Commit bbf95a6

Browse files
committed
Add new sync implementation
1 parent cf06f4a commit bbf95a6

File tree

4 files changed

+61
-9
lines changed

4 files changed

+61
-9
lines changed

packages/powersync_core/lib/src/sync/options.dart

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,43 @@ final class SyncOptions {
2424
/// When set to null, PowerSync defaults to a delay of 5 seconds.
2525
final Duration? retryDelay;
2626

27+
/// The [SyncClientImplementation] to use.
28+
final SyncClientImplementation syncImplementation;
29+
2730
const SyncOptions({
2831
this.crudThrottleTime,
2932
this.retryDelay,
3033
this.params,
34+
this.syncImplementation = SyncClientImplementation.defaultClient,
3135
});
3236
}
3337

38+
/// The PowerSync SDK offers two different implementations for receiving sync
39+
/// lines: One handling most logic in Dart, and a newer one offloading that work
40+
/// to the native PowerSync extension.
41+
enum SyncClientImplementation {
42+
/// A sync implementation that decodes and handles sync lines in Dart.
43+
@Deprecated(
44+
"Don't use SyncClientImplementation.dart directly, "
45+
"use SyncClientImplementation.defaultClient instead.",
46+
)
47+
dart,
48+
49+
/// An experimental sync implementation that parses and handles sync lines in
50+
/// the native PowerSync core extensions.
51+
///
52+
/// This implementation can be more performant than the Dart implementation,
53+
/// and supports receiving sync lines in a more efficient format.
54+
///
55+
/// Note that this option is currently experimental.
56+
@experimental
57+
rust;
58+
59+
/// The default sync client implementation to use.
60+
// ignore: deprecated_member_use_from_same_package
61+
static const defaultClient = dart;
62+
}
63+
3464
@internal
3565
extension type ResolvedSyncOptions(SyncOptions source) {
3666
Duration get crudThrottleTime =>

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import 'dart:typed_data';
44

55
import 'package:http/http.dart' as http;
66
import 'package:logging/logging.dart';
7-
import 'package:logging/logging.dart';
87
import 'package:meta/meta.dart';
98
import 'package:powersync_core/src/abort_controller.dart';
109
import 'package:powersync_core/src/exceptions.dart';
@@ -450,6 +449,7 @@ class StreamingSyncImplementation implements StreamingSync {
450449
case UploadCompleted():
451450
// Only relevant for the Rust sync implementation.
452451
break;
452+
case AbortCurrentIteration():
453453
case TokenRefreshComplete():
454454
// We have a new token, so stop the iteration.
455455
shouldStopIteration = true;
@@ -585,6 +585,7 @@ typedef BucketDescription = ({
585585

586586
final class _ActiveRustStreamingIteration {
587587
final StreamingSyncImplementation sync;
588+
var _isActive = true;
588589

589590
StreamSubscription<void>? _completedUploads;
590591
final Completer<void> _completedStream = Completer();
@@ -597,6 +598,7 @@ final class _ActiveRustStreamingIteration {
597598
assert(_completedStream.isCompleted, 'Should have started streaming');
598599
await _completedStream.future;
599600
} finally {
601+
_isActive = true;
600602
_completedUploads?.cancel();
601603
await _stop();
602604
}
@@ -610,7 +612,7 @@ final class _ActiveRustStreamingIteration {
610612
final events = addBroadcast(
611613
_receiveLines(request.request), sync._nonLineSyncEvents.stream);
612614

613-
listen:
615+
loop:
614616
await for (final event in events) {
615617
switch (event) {
616618
case ReceivedLine(line: final Uint8List line):
@@ -619,10 +621,10 @@ final class _ActiveRustStreamingIteration {
619621
await _control('line_text', line);
620622
case UploadCompleted():
621623
await _control('completed_upload');
624+
case AbortCurrentIteration():
625+
break loop;
622626
case TokenRefreshComplete():
623627
await _control('refreshed_token');
624-
case AbortRequested():
625-
break listen;
626628
}
627629
}
628630
}
@@ -653,11 +655,20 @@ final class _ActiveRustStreamingIteration {
653655
_completedStream.complete(_handleLines(instruction));
654656
case UpdateSyncStatus(:final status):
655657
sync._state.updateStatus((m) => m.applyFromCore(status));
656-
case FetchCredentials():
657-
// TODO: Handle this case.
658-
throw UnimplementedError();
658+
case FetchCredentials(:final didExpire):
659+
if (didExpire) {
660+
await sync.connector.prefetchCredentials(invalidate: true);
661+
} else {
662+
sync.connector.prefetchCredentials().then((_) {
663+
if (_isActive && !sync.aborted) {
664+
sync._nonLineSyncEvents.add(const TokenRefreshComplete());
665+
}
666+
}, onError: (Object e, StackTrace s) {
667+
sync.logger.warning('Could not prefetch credentials', e, s);
668+
});
669+
}
659670
case CloseSyncStream():
660-
sync._nonLineSyncEvents.add(AbortRequested());
671+
sync._nonLineSyncEvents.add(const AbortCurrentIteration());
661672
case FlushFileSystem():
662673
await sync.adapter.flushFileSystem();
663674
case DidCompleteSync():
@@ -683,3 +694,7 @@ final class UploadCompleted implements SyncEvent {
683694
final class TokenRefreshComplete implements SyncEvent {
684695
const TokenRefreshComplete();
685696
}
697+
698+
final class AbortCurrentIteration implements SyncEvent {
699+
const AbortCurrentIteration();
700+
}

packages/powersync_core/lib/src/web/sync_worker.dart

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ class _ConnectedClient {
8080
final encodedParams =>
8181
jsonDecode(encodedParams) as Map<String, Object?>,
8282
},
83+
syncImplementation: switch (request.implementationName) {
84+
null => SyncClientImplementation.defaultClient,
85+
final name => SyncClientImplementation.values.byName(name),
86+
},
8387
);
8488

8589
_runner = _worker.referenceSyncTask(

packages/powersync_core/lib/src/web/sync_worker_protocol.dart

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,16 @@ extension type StartSynchronization._(JSObject _) implements JSObject {
6969
required String databaseName,
7070
required int crudThrottleTimeMs,
7171
required int requestId,
72-
required int? retryDelayMs,
72+
required int retryDelayMs,
73+
required String implementationName,
7374
String? syncParamsEncoded,
7475
});
7576

7677
external String get databaseName;
7778
external int get requestId;
7879
external int get crudThrottleTimeMs;
7980
external int? get retryDelayMs;
81+
external String? get implementationName;
8082
external String? get syncParamsEncoded;
8183
}
8284

@@ -417,6 +419,7 @@ final class WorkerCommunicationChannel {
417419
crudThrottleTimeMs: options.crudThrottleTime.inMilliseconds,
418420
retryDelayMs: options.retryDelay.inMilliseconds,
419421
requestId: id,
422+
implementationName: options.source.syncImplementation.name,
420423
syncParamsEncoded: switch (options.source.params) {
421424
null => null,
422425
final params => jsonEncode(params),

0 commit comments

Comments
 (0)