From de6d689c0044de55c0e90c27e0a799a6962a2ce0 Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Wed, 10 Dec 2025 16:58:00 +0100 Subject: [PATCH 1/7] Updated tracking / transitions of the task's version state. --- app/lib/task/backend.dart | 16 +++-------- app/lib/task/models.dart | 54 ++++++++++++++++++++++++++++++++++++- app/lib/task/models.g.dart | 10 ++++--- app/lib/task/scheduler.dart | 25 ++++------------- 4 files changed, 68 insertions(+), 37 deletions(-) diff --git a/app/lib/task/backend.dart b/app/lib/task/backend.dart index 3344bf7382..178b8d90cf 100644 --- a/app/lib/task/backend.dart +++ b/app/lib/task/backend.dart @@ -695,17 +695,9 @@ class TaskBackend { zone = versionState.zone!; instance = versionState.instance!; - - // Remove instanceName, zone, secretToken, and set attempts = 0 - state.versions![version] = PackageVersionStateInfo( - scheduled: versionState.scheduled, + state.versions![version] = versionState.complete( docs: hasDocIndexHtml, pana: summary != null, - finished: true, - attempts: 0, - instance: null, // version is no-longer running on this instance - secretToken: null, // TODO: Consider retaining this for idempotency - zone: null, ); // Determine if something else was running on the instance @@ -1008,13 +1000,12 @@ class TaskBackend { await for (final state in _db.tasks.listAllForCurrentRuntime()) { final zone = taskWorkerCloudCompute.zones.first; // ignore: invalid_use_of_visible_for_testing_member - final updated = await updatePackageStateWithPendingVersions( + final payload = await updatePackageStateWithPendingVersions( _db, state.package, zone, taskWorkerCloudCompute.generateInstanceName(), ); - final payload = updated?.$1; if (payload == null) continue; await processPayload(payload); } @@ -1424,7 +1415,6 @@ final class _TaskDataAccess { Future restorePreviousVersionsState( String packageName, String instanceName, - Map previousVersionsMap, ) async { await withRetryTransaction(_db, (tx) async { final s = await tx.tasks.lookupOrNull(packageName); @@ -1435,7 +1425,7 @@ final class _TaskDataAccess { s.versions!.addEntries( s.versions!.entries .where((e) => e.value.instance == instanceName) - .map((e) => MapEntry(e.key, previousVersionsMap[e.key]!)), + .map((e) => MapEntry(e.key, e.value.resetAfterFailedAttempt())), ); s.pendingAt = derivePendingAt( versions: s.versions!, diff --git a/app/lib/task/models.dart b/app/lib/task/models.dart index 1ccd3bba4c..fc689e7a62 100644 --- a/app/lib/task/models.dart +++ b/app/lib/task/models.dart @@ -3,6 +3,7 @@ // BSD-style license that can be found in the LICENSE file. import 'dart:convert' show json; +import 'dart:math'; import 'package:clock/clock.dart'; import 'package:json_annotation/json_annotation.dart'; @@ -249,7 +250,7 @@ List derivePendingVersions({ } /// State of a given `version` within a [PackageState]. -@JsonSerializable() +@JsonSerializable(includeIfNull: false) class PackageVersionStateInfo { PackageVersionStatus get status { if (attempts == 0 && scheduled == initialTimestamp) { @@ -319,6 +320,9 @@ class PackageVersionStateInfo { /// comparison. Please use [isAuthorized] for validating a request. final String? secretToken; + /// The previous scheduled timestamp (if we are currently in an active schedule). + final DateTime? previousScheduled; + /// Return true, if [token] matches [secretToken] and it has not expired. /// /// This does a fixed-time comparison to mitigate timing attacks. @@ -347,6 +351,7 @@ class PackageVersionStateInfo { this.docs = false, this.pana = false, this.finished = false, + this.previousScheduled, }); factory PackageVersionStateInfo.fromJson(Map m) => @@ -364,6 +369,53 @@ class PackageVersionStateInfo { 'secretToken: $secretToken', ].join(', ') + ')'; + + // Remove instanceName, zone, secretToken, and set attempts = 0 + PackageVersionStateInfo complete({required bool pana, required bool docs}) { + return PackageVersionStateInfo( + scheduled: scheduled, + attempts: 0, + docs: docs, + pana: pana, + finished: true, + zone: null, + instance: null, // version is no-longer running on this instance + secretToken: null, // TODO: Consider retaining this for idempotency + previousScheduled: null, + ); + } + + /// Derives a new version state with scheduling information. + PackageVersionStateInfo scheduleNew({ + required String zone, + required String instanceName, + }) { + return PackageVersionStateInfo( + scheduled: clock.now(), + attempts: attempts + 1, + zone: zone, + instance: instanceName, + secretToken: createUuid(), + finished: finished, + docs: docs, + pana: pana, + previousScheduled: scheduled, + ); + } + + /// Reverts the status of the last scheduling attempt, which has presumably failed. + PackageVersionStateInfo resetAfterFailedAttempt() { + return PackageVersionStateInfo( + scheduled: previousScheduled ?? initialTimestamp, + attempts: max(0, attempts - 1), + zone: null, + instance: null, + secretToken: null, + finished: finished, + docs: docs, + pana: pana, + ); + } } /// A [db.Property] encoding a Map from version to [PackageVersionStateInfo] as JSON. diff --git a/app/lib/task/models.g.dart b/app/lib/task/models.g.dart index d1b3d8816a..9854fbc072 100644 --- a/app/lib/task/models.g.dart +++ b/app/lib/task/models.g.dart @@ -17,6 +17,9 @@ PackageVersionStateInfo _$PackageVersionStateInfoFromJson( docs: json['docs'] as bool? ?? false, pana: json['pana'] as bool? ?? false, finished: json['finished'] as bool? ?? false, + previousScheduled: json['previousScheduled'] == null + ? null + : DateTime.parse(json['previousScheduled'] as String), ); Map _$PackageVersionStateInfoToJson( @@ -27,9 +30,10 @@ Map _$PackageVersionStateInfoToJson( 'finished': instance.finished, 'scheduled': instance.scheduled.toIso8601String(), 'attempts': instance.attempts, - 'zone': instance.zone, - 'instance': instance.instance, - 'secretToken': instance.secretToken, + 'zone': ?instance.zone, + 'instance': ?instance.instance, + 'secretToken': ?instance.secretToken, + 'previousScheduled': ?instance.previousScheduled?.toIso8601String(), }; PackageStateInfo _$PackageStateInfoFromJson(Map json) => diff --git a/app/lib/task/scheduler.dart b/app/lib/task/scheduler.dart index 4ec22c2e40..6fd5f4f838 100644 --- a/app/lib/task/scheduler.dart +++ b/app/lib/task/scheduler.dart @@ -10,7 +10,6 @@ import 'package:meta/meta.dart'; import 'package:pub_dev/package/backend.dart'; import 'package:pub_dev/shared/configuration.dart'; import 'package:pub_dev/shared/datastore.dart'; -import 'package:pub_dev/shared/utils.dart'; import 'package:pub_dev/task/backend.dart'; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; import 'package:pub_dev/task/models.dart'; @@ -101,13 +100,12 @@ Future<(CreateInstancesState, Duration)> runOneCreateInstancesCycle( final instanceName = compute.generateInstanceName(); final zone = pickZone(); - final updated = await updatePackageStateWithPendingVersions( + final payload = await updatePackageStateWithPendingVersions( db, selected.package, zone, instanceName, ); - final payload = updated?.$1; if (payload == null) { return; } @@ -174,15 +172,13 @@ Future<(CreateInstancesState, Duration)> runOneCreateInstancesCycle( banZone(zone, minutes: 15); } if (rollbackPackageState) { - final oldVersionsMap = updated?.$2 ?? const {}; - // Restore the state of the PackageState for versions that were + // Restire the state of the PackageState for versions that were // suppose to run on the instance we just failed to create. // If this doesn't work, we'll eventually retry. Hence, correctness // does not hinge on this transaction being successful. await db.tasks.restorePreviousVersionsState( selected.package, instanceName, - oldVersionsMap, ); } } @@ -221,11 +217,8 @@ Future<(CreateInstancesState, Duration)> runOneCreateInstancesCycle( /// Updates the package state with versions that are already pending or /// will be pending soon. -/// -/// Returns the payload and the old status of the state info version map @visibleForTesting -Future<(Payload, Map)?> -updatePackageStateWithPendingVersions( +Future updatePackageStateWithPendingVersions( DatastoreDB db, String package, String zone, @@ -237,7 +230,6 @@ updatePackageStateWithPendingVersions( // presumably the package was deleted. return null; } - final oldVersionsMap = {...?s.versions}; final now = clock.now(); final pendingVersions = derivePendingVersions( @@ -253,14 +245,7 @@ updatePackageStateWithPendingVersions( // Update PackageState s.versions!.addAll({ for (final v in pendingVersions.map((v) => v.toString())) - v: PackageVersionStateInfo( - scheduled: now, - attempts: s.versions![v]!.attempts + 1, - zone: zone, - instance: instanceName, - secretToken: createUuid(), - finished: s.versions![v]!.finished, - ), + v: s.versions![v]!.scheduleNew(zone: zone, instanceName: instanceName), }); s.pendingAt = derivePendingAt( versions: s.versions!, @@ -279,6 +264,6 @@ updatePackageStateWithPendingVersions( ), ), ); - return (payload, oldVersionsMap); + return payload; }); } From 95fe45cdffabc6383a80dc7037832db53f560ca7 Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Fri, 19 Dec 2025 12:25:15 +0100 Subject: [PATCH 2/7] Do not need previousScheduled --- app/lib/task/models.dart | 8 +------- app/lib/task/models.g.dart | 4 ---- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/app/lib/task/models.dart b/app/lib/task/models.dart index fc689e7a62..1a534da1cd 100644 --- a/app/lib/task/models.dart +++ b/app/lib/task/models.dart @@ -320,9 +320,6 @@ class PackageVersionStateInfo { /// comparison. Please use [isAuthorized] for validating a request. final String? secretToken; - /// The previous scheduled timestamp (if we are currently in an active schedule). - final DateTime? previousScheduled; - /// Return true, if [token] matches [secretToken] and it has not expired. /// /// This does a fixed-time comparison to mitigate timing attacks. @@ -351,7 +348,6 @@ class PackageVersionStateInfo { this.docs = false, this.pana = false, this.finished = false, - this.previousScheduled, }); factory PackageVersionStateInfo.fromJson(Map m) => @@ -381,7 +377,6 @@ class PackageVersionStateInfo { zone: null, instance: null, // version is no-longer running on this instance secretToken: null, // TODO: Consider retaining this for idempotency - previousScheduled: null, ); } @@ -399,14 +394,13 @@ class PackageVersionStateInfo { finished: finished, docs: docs, pana: pana, - previousScheduled: scheduled, ); } /// Reverts the status of the last scheduling attempt, which has presumably failed. PackageVersionStateInfo resetAfterFailedAttempt() { return PackageVersionStateInfo( - scheduled: previousScheduled ?? initialTimestamp, + scheduled: initialTimestamp, attempts: max(0, attempts - 1), zone: null, instance: null, diff --git a/app/lib/task/models.g.dart b/app/lib/task/models.g.dart index 9854fbc072..121f23e3cc 100644 --- a/app/lib/task/models.g.dart +++ b/app/lib/task/models.g.dart @@ -17,9 +17,6 @@ PackageVersionStateInfo _$PackageVersionStateInfoFromJson( docs: json['docs'] as bool? ?? false, pana: json['pana'] as bool? ?? false, finished: json['finished'] as bool? ?? false, - previousScheduled: json['previousScheduled'] == null - ? null - : DateTime.parse(json['previousScheduled'] as String), ); Map _$PackageVersionStateInfoToJson( @@ -33,7 +30,6 @@ Map _$PackageVersionStateInfoToJson( 'zone': ?instance.zone, 'instance': ?instance.instance, 'secretToken': ?instance.secretToken, - 'previousScheduled': ?instance.previousScheduled?.toIso8601String(), }; PackageStateInfo _$PackageStateInfoFromJson(Map json) => From 6a43100b7c8fd627a24fdf653d32a1be657b91f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Istv=C3=A1n=20So=C3=B3s?= Date: Fri, 19 Dec 2025 12:06:42 +0100 Subject: [PATCH 3/7] Update app/lib/task/models.dart Co-authored-by: Jonas Finnemann Jensen --- app/lib/task/models.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/lib/task/models.dart b/app/lib/task/models.dart index 1a534da1cd..91eaaebc19 100644 --- a/app/lib/task/models.dart +++ b/app/lib/task/models.dart @@ -366,7 +366,7 @@ class PackageVersionStateInfo { ].join(', ') + ')'; - // Remove instanceName, zone, secretToken, and set attempts = 0 + /// Remove instanceName, zone, secretToken, and set attempts = 0 PackageVersionStateInfo complete({required bool pana, required bool docs}) { return PackageVersionStateInfo( scheduled: scheduled, From 73e5dc0aa8b122e58c8c5fb7da7518be79365e45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Istv=C3=A1n=20So=C3=B3s?= Date: Fri, 19 Dec 2025 12:07:52 +0100 Subject: [PATCH 4/7] Update app/lib/task/models.dart Co-authored-by: Jonas Finnemann Jensen --- app/lib/task/models.dart | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/app/lib/task/models.dart b/app/lib/task/models.dart index 91eaaebc19..c0f2ff4bb0 100644 --- a/app/lib/task/models.dart +++ b/app/lib/task/models.dart @@ -384,6 +384,20 @@ class PackageVersionStateInfo { PackageVersionStateInfo scheduleNew({ required String zone, required String instanceName, + /// Create updated [PackageVersionStateInfo] for when a new instance have been + /// scheduled. + /// + /// You must supply: + /// * [scheduled] when the instance was scheduled. + /// * [zone] within which the instance was scheduled. + /// * [instanceName] as name of the of the isntance scheduled. + /// * [secretToken] passed to the instance for authentication when + /// the instance wants to callback. + PackageVersionStateInfo scheduleNew({ + required DateTime scheduled, + required String zone, + required String instanceName, + required String secretToken, }) { return PackageVersionStateInfo( scheduled: clock.now(), From 9d33899e318c48429bd9879905769b1d34ed7a60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Istv=C3=A1n=20So=C3=B3s?= Date: Fri, 19 Dec 2025 12:08:41 +0100 Subject: [PATCH 5/7] Update app/lib/task/models.dart Co-authored-by: Jonas Finnemann Jensen --- app/lib/task/models.dart | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/app/lib/task/models.dart b/app/lib/task/models.dart index c0f2ff4bb0..ec9f650c90 100644 --- a/app/lib/task/models.dart +++ b/app/lib/task/models.dart @@ -411,8 +411,18 @@ class PackageVersionStateInfo { ); } - /// Reverts the status of the last scheduling attempt, which has presumably failed. - PackageVersionStateInfo resetAfterFailedAttempt() { + /// Reverts the status of the last scheduling attempt, when it is known that scheduling failed. + /// + /// > [!WARNING] + /// > This state transition **may only** be used if it's + /// > **known with certainty** that scheduling failed. + /// > + /// > If an instance _may_ have been scheduled, but we suspect + /// > scheduling failed, we have to wait for a retry. + /// > As we otherwise risk leaving an instance unable to call back, + /// > which will leave the instance logging errors that indicate + /// > internal errors in our system. + PackageVersionStateInfo resetAfterFailedAttempt({required DateTime previousScheduled}) { return PackageVersionStateInfo( scheduled: initialTimestamp, attempts: max(0, attempts - 1), From 3f20bac95f878972f5ae5d753fb621e7a6cd825a Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Fri, 19 Dec 2025 12:32:45 +0100 Subject: [PATCH 6/7] fix merge errors --- app/lib/task/models.dart | 12 ++++-------- app/lib/task/scheduler.dart | 8 +++++++- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/app/lib/task/models.dart b/app/lib/task/models.dart index ec9f650c90..b13f67931c 100644 --- a/app/lib/task/models.dart +++ b/app/lib/task/models.dart @@ -380,10 +380,6 @@ class PackageVersionStateInfo { ); } - /// Derives a new version state with scheduling information. - PackageVersionStateInfo scheduleNew({ - required String zone, - required String instanceName, /// Create updated [PackageVersionStateInfo] for when a new instance have been /// scheduled. /// @@ -400,11 +396,11 @@ class PackageVersionStateInfo { required String secretToken, }) { return PackageVersionStateInfo( - scheduled: clock.now(), + scheduled: scheduled, attempts: attempts + 1, zone: zone, instance: instanceName, - secretToken: createUuid(), + secretToken: secretToken, finished: finished, docs: docs, pana: pana, @@ -414,7 +410,7 @@ class PackageVersionStateInfo { /// Reverts the status of the last scheduling attempt, when it is known that scheduling failed. /// /// > [!WARNING] - /// > This state transition **may only** be used if it's + /// > This state transition **may only** be used if it's /// > **known with certainty** that scheduling failed. /// > /// > If an instance _may_ have been scheduled, but we suspect @@ -422,7 +418,7 @@ class PackageVersionStateInfo { /// > As we otherwise risk leaving an instance unable to call back, /// > which will leave the instance logging errors that indicate /// > internal errors in our system. - PackageVersionStateInfo resetAfterFailedAttempt({required DateTime previousScheduled}) { + PackageVersionStateInfo resetAfterFailedAttempt() { return PackageVersionStateInfo( scheduled: initialTimestamp, attempts: max(0, attempts - 1), diff --git a/app/lib/task/scheduler.dart b/app/lib/task/scheduler.dart index 6fd5f4f838..f31d00a35f 100644 --- a/app/lib/task/scheduler.dart +++ b/app/lib/task/scheduler.dart @@ -10,6 +10,7 @@ import 'package:meta/meta.dart'; import 'package:pub_dev/package/backend.dart'; import 'package:pub_dev/shared/configuration.dart'; import 'package:pub_dev/shared/datastore.dart'; +import 'package:pub_dev/shared/utils.dart'; import 'package:pub_dev/task/backend.dart'; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; import 'package:pub_dev/task/models.dart'; @@ -245,7 +246,12 @@ Future updatePackageStateWithPendingVersions( // Update PackageState s.versions!.addAll({ for (final v in pendingVersions.map((v) => v.toString())) - v: s.versions![v]!.scheduleNew(zone: zone, instanceName: instanceName), + v: s.versions![v]!.scheduleNew( + scheduled: clock.now(), + zone: zone, + instanceName: instanceName, + secretToken: createUuid(), + ), }); s.pendingAt = derivePendingAt( versions: s.versions!, From 8508e6ae6dd1cafab971edd29f4c04f8d3d5332d Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Fri, 19 Dec 2025 12:33:22 +0100 Subject: [PATCH 7/7] use the same now --- app/lib/task/scheduler.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/lib/task/scheduler.dart b/app/lib/task/scheduler.dart index f31d00a35f..5c57730145 100644 --- a/app/lib/task/scheduler.dart +++ b/app/lib/task/scheduler.dart @@ -247,7 +247,7 @@ Future updatePackageStateWithPendingVersions( s.versions!.addAll({ for (final v in pendingVersions.map((v) => v.toString())) v: s.versions![v]!.scheduleNew( - scheduled: clock.now(), + scheduled: now, zone: zone, instanceName: instanceName, secretToken: createUuid(),