From abeea9a2c5edf2980925904c6395140f5a7a607f Mon Sep 17 00:00:00 2001 From: Kartal Kaan Bozdogan Date: Sun, 30 Mar 2025 22:55:31 +0200 Subject: [PATCH] Moved some fields from the GlobalCtx to a new class, _UpdateCtx Lets us swiftly remove all dag run-related state once it is done Ban .listen even for async computations onDependencyUpdated no longer throws exceptions NoValueException Instead returns an empty downstream .listen no longer calls eval() directly It used to bypass onDependencyChanged, breaking the delayed eval pattern Routers: Delay the delivery of data if a dag run is already going on StreamExtension: Delay the onListen event by one microtask to make sure we don't call .listen within a computation in case we are being used by a computation --- packages/computed/lib/computed.dart | 2 + packages/computed/lib/src/computed.dart | 217 +++++++++++------- .../computed/lib/src/computed_stream.dart | 5 +- packages/computed/test/computed_test.dart | 5 +- .../computed/test/delayed_reeval_test.dart | 13 +- 5 files changed, 147 insertions(+), 95 deletions(-) diff --git a/packages/computed/lib/computed.dart b/packages/computed/lib/computed.dart index 4f13298..aa7a149 100644 --- a/packages/computed/lib/computed.dart +++ b/packages/computed/lib/computed.dart @@ -59,6 +59,8 @@ abstract interface class Computed { /// gains a value or throws an exception for the /// first time or when the result of the computation changes. /// [onError] has the same semantics as in [Stream.listen]. + /// + /// Cannot be used inside computations. Instead, see [use]. ComputedSubscription listen(void Function(T event)? onData, [Function? onError]); diff --git a/packages/computed/lib/src/computed.dart b/packages/computed/lib/src/computed.dart index b1577c0..8e36acb 100644 --- a/packages/computed/lib/src/computed.dart +++ b/packages/computed/lib/src/computed.dart @@ -81,14 +81,16 @@ const _noReactivityInsideReact = "`use`, `useWeak` and `react` not allowed inside react callbacks."; const _noReactivityOutsideComputations = "`use`, `useWeak`, `react` and `prev` are only allowed inside computations."; +const _noListenInsideComputations = + "`listen` is not allowed inside computations."; class GlobalCtx { - static ComputedImpl? _currentComputation; static ComputedImpl get currentComputation { - if (_currentComputation == null) { + final node = _updateCtx?._currentComputation; + if (node == null) { throw StateError(_noReactivityOutsideComputations); } - return _currentComputation!; + return node; } @visibleForTesting @@ -127,50 +129,54 @@ class GlobalCtx { static final _routerExpando = Expando<_RouterValueOrException>('computed'); static var _currentUpdate = _Token(); // Guaranteed to be unique thanks to GC - static Set _currentUpdateNodes = {}; - static Expando _currentUpdateNodeDirty = Expando(); - static Expando> - _currentUpdateUpstreamComputations = Expando(); - static var _reacting = false; + static _UpdateCtx? _updateCtx; } -void _injectNodesToDAG(Set nodes) { - for (var c in nodes) { - GlobalCtx._currentUpdateNodeDirty[c] = true; +class _UpdateCtx { + ComputedImpl? _currentComputation; + final _currentUpdateNodes = {}; + final _currentUpdateNodeDirty = Expando(); + final _currentUpdateUpstreamComputations = + Expando>(); + var _reacting = false; + + void _injectNodesToDAG(Set nodes) { + for (var c in nodes) { + _currentUpdateNodeDirty[c] = true; + } + _currentUpdateNodes.addAll(nodes.cast()); } - GlobalCtx._currentUpdateNodes.addAll(nodes.cast()); } void _rerunGraph(Set roots) { - GlobalCtx._currentUpdateNodes = {}; - GlobalCtx._currentUpdateNodeDirty = Expando(); - GlobalCtx._currentUpdateUpstreamComputations = Expando(); - _injectNodesToDAG(roots); - - void evalAfterEnsureUpstreamEvald(ComputedImpl node) { - for (var c in node._lastUpstreamComputations.keys) { - if (c._lastUpdate != GlobalCtx._currentUpdate) { - evalAfterEnsureUpstreamEvald(c); + final updateCtx = _UpdateCtx(); + try { + GlobalCtx._updateCtx = updateCtx; + updateCtx._injectNodesToDAG(roots); + + void evalAfterEnsureUpstreamEvald(ComputedImpl node) { + for (var c in node._lastUpstreamComputations.keys) { + if (c._lastUpdate != GlobalCtx._currentUpdate) { + evalAfterEnsureUpstreamEvald(c); + } } - } - // It is possible that this node has been forced to be evaluated by another - // In this case, do not re-compute it again - if (GlobalCtx._currentUpdateNodeDirty[node] == true) { - try { + // It is possible that this node has been forced to be evaluated by another + // In this case, do not re-compute it again + if (updateCtx._currentUpdateNodeDirty[node] == true) { final downstream = node.onDependencyUpdated(); - _injectNodesToDAG(downstream); - } on NoValueException { - // Pass. We must still consider the downstream. + updateCtx._injectNodesToDAG(downstream); } } - } - while (GlobalCtx._currentUpdateNodes.isNotEmpty) { - final cur = GlobalCtx._currentUpdateNodes.first; - GlobalCtx._currentUpdateNodes.remove(cur); - if (cur._lastUpdate == GlobalCtx._currentUpdate) continue; - evalAfterEnsureUpstreamEvald(cur); + while (updateCtx._currentUpdateNodes.isNotEmpty) { + final cur = updateCtx._currentUpdateNodes.first; + updateCtx._currentUpdateNodes.remove(cur); + if (cur._lastUpdate == GlobalCtx._currentUpdate) continue; + evalAfterEnsureUpstreamEvald(cur); + } + } finally { + GlobalCtx._updateCtx = null; } } @@ -186,7 +192,7 @@ class ComputedImpl implements Computed { {}; bool get _computing => - GlobalCtx._currentUpdateUpstreamComputations[this] != null; + GlobalCtx._updateCtx?._currentUpdateUpstreamComputations[this] != null; final _memoizedDownstreamComputations = {}; final _weakDownstreamComputations = @@ -214,11 +220,12 @@ class ComputedImpl implements Computed { if (_computing) throw CyclicUseException(); final caller = GlobalCtx.currentComputation; - if (GlobalCtx._reacting) { + final updateCtx = GlobalCtx._updateCtx!; + if (updateCtx._reacting == true) { throw StateError(_noReactivityInsideReact); } // Make sure the caller is subscribed, upgrade to non-weak if needed - GlobalCtx._currentUpdateUpstreamComputations[caller]!.update( + updateCtx._currentUpdateUpstreamComputations[caller]!.update( this, (v) => _WeakMemoizedValueOrException(weak && v._weak, true, _lastResult), @@ -234,7 +241,7 @@ class ComputedImpl implements Computed { _use(false); if (_lastUpdate != GlobalCtx._currentUpdate && _lastResult == null) { final downstream = eval(); - _injectNodesToDAG(downstream); + GlobalCtx._updateCtx!._injectNodesToDAG(downstream); } switch (_lastResult) { @@ -306,15 +313,13 @@ class ComputedImpl implements Computed { @override ComputedSubscription listen(void Function(T event)? onData, [Function? onError]) { + if (GlobalCtx._updateCtx?._currentComputation != null) { + throw StateError(_noListenInsideComputations); + } _validateOnError(onError); final sub = _ComputedSubscriptionImpl(this, onData, onError); if (_novalue) { - try { - eval(); - // Might set lastResult, won't notify the listener just yet (as that is against the Stream contract) - } on NoValueException { - // It is fine if we don't have a value yet - } + _rerunGraph({this}); } _listeners[sub] = false; @@ -399,7 +404,22 @@ class ComputedImpl implements Computed { // Returns the set of downstream nodes to be re-computed. // This is public so that it can be customized by subclasses Set onDependencyUpdated() { - return eval(); + // Create a new update context if needed + final newContext = GlobalCtx._updateCtx == null; + if (newContext) { + GlobalCtx._updateCtx = _UpdateCtx(); + } + try { + return eval(); + } on NoValueException { + // Do not notify the downstream if this computation + // did not produce a value + return {}; + } finally { + if (newContext) { + GlobalCtx._updateCtx = null; + } + } } T _evalFInZone() { @@ -433,13 +453,14 @@ class ComputedImpl implements Computed { Set eval() { const idempotencyFailureMessage = "Computed expressions must be purely functional. Please use listeners for side effects. For computations creating asynchronous operations, make sure to use `Computed.async`."; - GlobalCtx._currentUpdateNodeDirty[this] = null; - final oldComputation = GlobalCtx._currentComputation; + final updateCtx = GlobalCtx._updateCtx!; + final oldComputation = updateCtx._currentComputation; + updateCtx._currentUpdateNodeDirty[this] = null; bool shouldNotify = false; try { _prevResult = _lastResult; - GlobalCtx._currentUpdateUpstreamComputations[this] = {}; - GlobalCtx._currentComputation = this; + updateCtx._currentUpdateUpstreamComputations[this] = {}; + updateCtx._currentComputation = this; var newResult = _evalFGuarded(); if (_assertIdempotent && switch (newResult) { @@ -479,12 +500,12 @@ class ComputedImpl implements Computed { // Commit the changes to the DAG for (var e - in GlobalCtx._currentUpdateUpstreamComputations[this]!.entries) { + in updateCtx._currentUpdateUpstreamComputations[this]!.entries) { final up = e.key; up._addDownstreamComputation(this, e.value._memoized, e.value._weak); } final oldDiffNew = _lastUpstreamComputations.keys.toSet().difference( - GlobalCtx._currentUpdateUpstreamComputations[this]!.keys.toSet()); + updateCtx._currentUpdateUpstreamComputations[this]!.keys.toSet()); for (var up in oldDiffNew) { up._removeDownstreamComputation(this); } @@ -493,8 +514,8 @@ class ComputedImpl implements Computed { // So that we can memoize that f threw NoValueException // when ran with a specific set of dependencies, for example. _lastUpstreamComputations = - GlobalCtx._currentUpdateUpstreamComputations[this]!; - GlobalCtx._currentUpdateUpstreamComputations[this] = null; + updateCtx._currentUpdateUpstreamComputations[this]!; + updateCtx._currentUpdateUpstreamComputations[this] = null; // Bookkeep the fact the we ran/tried to run this computation // so that we can unlock its downstream during the DAG walk _lastUpdate = GlobalCtx._currentUpdate; @@ -522,10 +543,10 @@ class ComputedImpl implements Computed { } finally { assert(_lastUpdate == GlobalCtx._currentUpdate); if (shouldNotify) { - GlobalCtx._currentComputation = null; + updateCtx._currentComputation = null; _notifyListeners(); } - GlobalCtx._currentComputation = oldComputation; + updateCtx._currentComputation = oldComputation; } } @@ -685,54 +706,72 @@ final class RouterImpl extends ComputedImpl { super._hasDownstreamComputations() || _nonMemoizedDownstreamComputations.isNotEmpty; + void _maybeDelay(void Function() f) async { + if (GlobalCtx._updateCtx != null) { + // An update is already going on. A data source + // must have notified us reentrantly. Process this + // event in a new microtask. + await Future.value(); + // Dag runs never span across microtasks + assert(GlobalCtx._updateCtx == null); + } + + f(); + } + void onDataSourceData(T data) { if (_dss == null) return; - GlobalCtx._currentUpdate = _Token(); - _dss!._lastEmit = GlobalCtx._currentUpdate; - final rvoe = - GlobalCtx._routerExpando[_dss!._ds] as _RouterValueOrException; - if (switch (rvoe._voe) { - Value(value: final value) => value != data, - _ => true - }) { - // Update the global last value cache - rvoe._voe = ValueOrException.value(data); - } else if (_nonMemoizedDownstreamComputations.isEmpty) { - return; - } + _maybeDelay(() { + GlobalCtx._currentUpdate = _Token(); + _dss!._lastEmit = GlobalCtx._currentUpdate; + final rvoe = + GlobalCtx._routerExpando[_dss!._ds] as _RouterValueOrException; + if (switch (rvoe._voe) { + Value(value: final value) => value != data, + _ => true + }) { + // Update the global last value cache + rvoe._voe = ValueOrException.value(data); + } else if (_nonMemoizedDownstreamComputations.isEmpty) { + return; + } - _rerunGraph({this}); + _rerunGraph({this}); + }); } void onDataSourceError(Object err, StackTrace st) { if (_dss == null) return; - GlobalCtx._currentUpdate = _Token(); - - _dss!._lastEmit = GlobalCtx._currentUpdate; - final rvoe = - GlobalCtx._routerExpando[_dss!._ds] as _RouterValueOrException; - if (switch (rvoe._voe) { - Exception(exc: final exc) => exc != err, - _ => true - }) { - // Update the global last value cache - rvoe._voe = ValueOrException.exc(err, st); - } else if (_nonMemoizedDownstreamComputations.isEmpty) { - return; - } + _maybeDelay(() { + GlobalCtx._currentUpdate = _Token(); + + _dss!._lastEmit = GlobalCtx._currentUpdate; + final rvoe = + GlobalCtx._routerExpando[_dss!._ds] as _RouterValueOrException; + if (switch (rvoe._voe) { + Exception(exc: final exc) => exc != err, + _ => true + }) { + // Update the global last value cache + rvoe._voe = ValueOrException.exc(err, st); + } else if (_nonMemoizedDownstreamComputations.isEmpty) { + return; + } - _rerunGraph({this}); + _rerunGraph({this}); + }); } void _react(void Function(T) onData, Function? onError) { // Only routers can be .react-ed to assert(_dss != null); final caller = GlobalCtx.currentComputation; - if (GlobalCtx._reacting) { + final updateCtx = GlobalCtx._updateCtx!; + if (updateCtx._reacting) { throw StateError(_noReactivityInsideReact); } // Make sure the caller is subscribed - GlobalCtx._currentUpdateUpstreamComputations[caller]![this] = + updateCtx._currentUpdateUpstreamComputations[caller]![this] = _WeakMemoizedValueOrException(false, false, _lastResult); if (_dss!._lastEmit != GlobalCtx._currentUpdate) { @@ -740,7 +779,7 @@ final class RouterImpl extends ComputedImpl { return; } - GlobalCtx._reacting = true; + updateCtx._reacting = true; try { switch (_lastResult!) { case Value(value: final value): @@ -756,7 +795,7 @@ final class RouterImpl extends ComputedImpl { } } } finally { - GlobalCtx._reacting = false; + updateCtx._reacting = false; } } diff --git a/packages/computed/lib/src/computed_stream.dart b/packages/computed/lib/src/computed_stream.dart index bed444c..e63b9cd 100644 --- a/packages/computed/lib/src/computed_stream.dart +++ b/packages/computed/lib/src/computed_stream.dart @@ -23,7 +23,10 @@ class ComputedStreamExtensionImpl { // No onPause and onResume, as Computed doesn't support these. } - void _onListen() { + void _onListen() async { + // StreamController can call onListen synchronously, + // so call .listen in a separate microtask. + await Future.value(); _computedSubscription ??= _parent.listen((event) => _controller!.add(event), (error) => _controller!.addError(error)); } diff --git a/packages/computed/test/computed_test.dart b/packages/computed/test/computed_test.dart index 353eb27..b69b10d 100644 --- a/packages/computed/test/computed_test.dart +++ b/packages/computed/test/computed_test.dart @@ -1438,7 +1438,10 @@ void main() { }, (e) { expect(flag, false); flag = true; - expect(e, isA()); + expect( + e, + isA().having((e) => e.message, 'message', + "`listen` is not allowed inside computations.")); }); await Future.value(); diff --git a/packages/computed/test/delayed_reeval_test.dart b/packages/computed/test/delayed_reeval_test.dart index f3de448..91c018e 100644 --- a/packages/computed/test/delayed_reeval_test.dart +++ b/packages/computed/test/delayed_reeval_test.dart @@ -45,16 +45,21 @@ void main() { expect(odcCnt, 1); expect(lCnt, 0); c.reeval(); - expect(odcCnt, 1); + expect(odcCnt, 1); // Because it gained a listener for the first time + expect(lCnt, 0); // As the ValueStream has not notified Computed yet + await Future.value(); + expect(odcCnt, 2); // Because the ValueStream now has a value + expect(lCnt, 0); // We haven't called reeval yet + c.reeval(); expect(lCnt, 1); expect(lastEvent, 0); s.add(0); - expect(odcCnt, 1); - s.add(1); expect(odcCnt, 2); + s.add(1); + expect(odcCnt, 3); expect(lCnt, 1); c.reeval(); - expect(odcCnt, 2); + expect(odcCnt, 3); expect(lCnt, 2); expect(lastEvent, 1);