diff --git a/packages/computed/lib/computed.dart b/packages/computed/lib/computed.dart index 4f13298..aa7a149 100644 --- a/packages/computed/lib/computed.dart +++ b/packages/computed/lib/computed.dart @@ -59,6 +59,8 @@ abstract interface class Computed { /// gains a value or throws an exception for the /// first time or when the result of the computation changes. /// [onError] has the same semantics as in [Stream.listen]. + /// + /// Cannot be used inside computations. Instead, see [use]. ComputedSubscription listen(void Function(T event)? onData, [Function? onError]); diff --git a/packages/computed/lib/src/computed.dart b/packages/computed/lib/src/computed.dart index b1577c0..8e36acb 100644 --- a/packages/computed/lib/src/computed.dart +++ b/packages/computed/lib/src/computed.dart @@ -81,14 +81,16 @@ const _noReactivityInsideReact = "`use`, `useWeak` and `react` not allowed inside react callbacks."; const _noReactivityOutsideComputations = "`use`, `useWeak`, `react` and `prev` are only allowed inside computations."; +const _noListenInsideComputations = + "`listen` is not allowed inside computations."; class GlobalCtx { - static ComputedImpl? _currentComputation; static ComputedImpl get currentComputation { - if (_currentComputation == null) { + final node = _updateCtx?._currentComputation; + if (node == null) { throw StateError(_noReactivityOutsideComputations); } - return _currentComputation!; + return node; } @visibleForTesting @@ -127,50 +129,54 @@ class GlobalCtx { static final _routerExpando = Expando<_RouterValueOrException>('computed'); static var _currentUpdate = _Token(); // Guaranteed to be unique thanks to GC - static Set _currentUpdateNodes = {}; - static Expando _currentUpdateNodeDirty = Expando(); - static Expando> - _currentUpdateUpstreamComputations = Expando(); - static var _reacting = false; + static _UpdateCtx? _updateCtx; } -void _injectNodesToDAG(Set nodes) { - for (var c in nodes) { - GlobalCtx._currentUpdateNodeDirty[c] = true; +class _UpdateCtx { + ComputedImpl? _currentComputation; + final _currentUpdateNodes = {}; + final _currentUpdateNodeDirty = Expando(); + final _currentUpdateUpstreamComputations = + Expando>(); + var _reacting = false; + + void _injectNodesToDAG(Set nodes) { + for (var c in nodes) { + _currentUpdateNodeDirty[c] = true; + } + _currentUpdateNodes.addAll(nodes.cast()); } - GlobalCtx._currentUpdateNodes.addAll(nodes.cast()); } void _rerunGraph(Set roots) { - GlobalCtx._currentUpdateNodes = {}; - GlobalCtx._currentUpdateNodeDirty = Expando(); - GlobalCtx._currentUpdateUpstreamComputations = Expando(); - _injectNodesToDAG(roots); - - void evalAfterEnsureUpstreamEvald(ComputedImpl node) { - for (var c in node._lastUpstreamComputations.keys) { - if (c._lastUpdate != GlobalCtx._currentUpdate) { - evalAfterEnsureUpstreamEvald(c); + final updateCtx = _UpdateCtx(); + try { + GlobalCtx._updateCtx = updateCtx; + updateCtx._injectNodesToDAG(roots); + + void evalAfterEnsureUpstreamEvald(ComputedImpl node) { + for (var c in node._lastUpstreamComputations.keys) { + if (c._lastUpdate != GlobalCtx._currentUpdate) { + evalAfterEnsureUpstreamEvald(c); + } } - } - // It is possible that this node has been forced to be evaluated by another - // In this case, do not re-compute it again - if (GlobalCtx._currentUpdateNodeDirty[node] == true) { - try { + // It is possible that this node has been forced to be evaluated by another + // In this case, do not re-compute it again + if (updateCtx._currentUpdateNodeDirty[node] == true) { final downstream = node.onDependencyUpdated(); - _injectNodesToDAG(downstream); - } on NoValueException { - // Pass. We must still consider the downstream. + updateCtx._injectNodesToDAG(downstream); } } - } - while (GlobalCtx._currentUpdateNodes.isNotEmpty) { - final cur = GlobalCtx._currentUpdateNodes.first; - GlobalCtx._currentUpdateNodes.remove(cur); - if (cur._lastUpdate == GlobalCtx._currentUpdate) continue; - evalAfterEnsureUpstreamEvald(cur); + while (updateCtx._currentUpdateNodes.isNotEmpty) { + final cur = updateCtx._currentUpdateNodes.first; + updateCtx._currentUpdateNodes.remove(cur); + if (cur._lastUpdate == GlobalCtx._currentUpdate) continue; + evalAfterEnsureUpstreamEvald(cur); + } + } finally { + GlobalCtx._updateCtx = null; } } @@ -186,7 +192,7 @@ class ComputedImpl implements Computed { {}; bool get _computing => - GlobalCtx._currentUpdateUpstreamComputations[this] != null; + GlobalCtx._updateCtx?._currentUpdateUpstreamComputations[this] != null; final _memoizedDownstreamComputations = {}; final _weakDownstreamComputations = @@ -214,11 +220,12 @@ class ComputedImpl implements Computed { if (_computing) throw CyclicUseException(); final caller = GlobalCtx.currentComputation; - if (GlobalCtx._reacting) { + final updateCtx = GlobalCtx._updateCtx!; + if (updateCtx._reacting == true) { throw StateError(_noReactivityInsideReact); } // Make sure the caller is subscribed, upgrade to non-weak if needed - GlobalCtx._currentUpdateUpstreamComputations[caller]!.update( + updateCtx._currentUpdateUpstreamComputations[caller]!.update( this, (v) => _WeakMemoizedValueOrException(weak && v._weak, true, _lastResult), @@ -234,7 +241,7 @@ class ComputedImpl implements Computed { _use(false); if (_lastUpdate != GlobalCtx._currentUpdate && _lastResult == null) { final downstream = eval(); - _injectNodesToDAG(downstream); + GlobalCtx._updateCtx!._injectNodesToDAG(downstream); } switch (_lastResult) { @@ -306,15 +313,13 @@ class ComputedImpl implements Computed { @override ComputedSubscription listen(void Function(T event)? onData, [Function? onError]) { + if (GlobalCtx._updateCtx?._currentComputation != null) { + throw StateError(_noListenInsideComputations); + } _validateOnError(onError); final sub = _ComputedSubscriptionImpl(this, onData, onError); if (_novalue) { - try { - eval(); - // Might set lastResult, won't notify the listener just yet (as that is against the Stream contract) - } on NoValueException { - // It is fine if we don't have a value yet - } + _rerunGraph({this}); } _listeners[sub] = false; @@ -399,7 +404,22 @@ class ComputedImpl implements Computed { // Returns the set of downstream nodes to be re-computed. // This is public so that it can be customized by subclasses Set onDependencyUpdated() { - return eval(); + // Create a new update context if needed + final newContext = GlobalCtx._updateCtx == null; + if (newContext) { + GlobalCtx._updateCtx = _UpdateCtx(); + } + try { + return eval(); + } on NoValueException { + // Do not notify the downstream if this computation + // did not produce a value + return {}; + } finally { + if (newContext) { + GlobalCtx._updateCtx = null; + } + } } T _evalFInZone() { @@ -433,13 +453,14 @@ class ComputedImpl implements Computed { Set eval() { const idempotencyFailureMessage = "Computed expressions must be purely functional. Please use listeners for side effects. For computations creating asynchronous operations, make sure to use `Computed.async`."; - GlobalCtx._currentUpdateNodeDirty[this] = null; - final oldComputation = GlobalCtx._currentComputation; + final updateCtx = GlobalCtx._updateCtx!; + final oldComputation = updateCtx._currentComputation; + updateCtx._currentUpdateNodeDirty[this] = null; bool shouldNotify = false; try { _prevResult = _lastResult; - GlobalCtx._currentUpdateUpstreamComputations[this] = {}; - GlobalCtx._currentComputation = this; + updateCtx._currentUpdateUpstreamComputations[this] = {}; + updateCtx._currentComputation = this; var newResult = _evalFGuarded(); if (_assertIdempotent && switch (newResult) { @@ -479,12 +500,12 @@ class ComputedImpl implements Computed { // Commit the changes to the DAG for (var e - in GlobalCtx._currentUpdateUpstreamComputations[this]!.entries) { + in updateCtx._currentUpdateUpstreamComputations[this]!.entries) { final up = e.key; up._addDownstreamComputation(this, e.value._memoized, e.value._weak); } final oldDiffNew = _lastUpstreamComputations.keys.toSet().difference( - GlobalCtx._currentUpdateUpstreamComputations[this]!.keys.toSet()); + updateCtx._currentUpdateUpstreamComputations[this]!.keys.toSet()); for (var up in oldDiffNew) { up._removeDownstreamComputation(this); } @@ -493,8 +514,8 @@ class ComputedImpl implements Computed { // So that we can memoize that f threw NoValueException // when ran with a specific set of dependencies, for example. _lastUpstreamComputations = - GlobalCtx._currentUpdateUpstreamComputations[this]!; - GlobalCtx._currentUpdateUpstreamComputations[this] = null; + updateCtx._currentUpdateUpstreamComputations[this]!; + updateCtx._currentUpdateUpstreamComputations[this] = null; // Bookkeep the fact the we ran/tried to run this computation // so that we can unlock its downstream during the DAG walk _lastUpdate = GlobalCtx._currentUpdate; @@ -522,10 +543,10 @@ class ComputedImpl implements Computed { } finally { assert(_lastUpdate == GlobalCtx._currentUpdate); if (shouldNotify) { - GlobalCtx._currentComputation = null; + updateCtx._currentComputation = null; _notifyListeners(); } - GlobalCtx._currentComputation = oldComputation; + updateCtx._currentComputation = oldComputation; } } @@ -685,54 +706,72 @@ final class RouterImpl extends ComputedImpl { super._hasDownstreamComputations() || _nonMemoizedDownstreamComputations.isNotEmpty; + void _maybeDelay(void Function() f) async { + if (GlobalCtx._updateCtx != null) { + // An update is already going on. A data source + // must have notified us reentrantly. Process this + // event in a new microtask. + await Future.value(); + // Dag runs never span across microtasks + assert(GlobalCtx._updateCtx == null); + } + + f(); + } + void onDataSourceData(T data) { if (_dss == null) return; - GlobalCtx._currentUpdate = _Token(); - _dss!._lastEmit = GlobalCtx._currentUpdate; - final rvoe = - GlobalCtx._routerExpando[_dss!._ds] as _RouterValueOrException; - if (switch (rvoe._voe) { - Value(value: final value) => value != data, - _ => true - }) { - // Update the global last value cache - rvoe._voe = ValueOrException.value(data); - } else if (_nonMemoizedDownstreamComputations.isEmpty) { - return; - } + _maybeDelay(() { + GlobalCtx._currentUpdate = _Token(); + _dss!._lastEmit = GlobalCtx._currentUpdate; + final rvoe = + GlobalCtx._routerExpando[_dss!._ds] as _RouterValueOrException; + if (switch (rvoe._voe) { + Value(value: final value) => value != data, + _ => true + }) { + // Update the global last value cache + rvoe._voe = ValueOrException.value(data); + } else if (_nonMemoizedDownstreamComputations.isEmpty) { + return; + } - _rerunGraph({this}); + _rerunGraph({this}); + }); } void onDataSourceError(Object err, StackTrace st) { if (_dss == null) return; - GlobalCtx._currentUpdate = _Token(); - - _dss!._lastEmit = GlobalCtx._currentUpdate; - final rvoe = - GlobalCtx._routerExpando[_dss!._ds] as _RouterValueOrException; - if (switch (rvoe._voe) { - Exception(exc: final exc) => exc != err, - _ => true - }) { - // Update the global last value cache - rvoe._voe = ValueOrException.exc(err, st); - } else if (_nonMemoizedDownstreamComputations.isEmpty) { - return; - } + _maybeDelay(() { + GlobalCtx._currentUpdate = _Token(); + + _dss!._lastEmit = GlobalCtx._currentUpdate; + final rvoe = + GlobalCtx._routerExpando[_dss!._ds] as _RouterValueOrException; + if (switch (rvoe._voe) { + Exception(exc: final exc) => exc != err, + _ => true + }) { + // Update the global last value cache + rvoe._voe = ValueOrException.exc(err, st); + } else if (_nonMemoizedDownstreamComputations.isEmpty) { + return; + } - _rerunGraph({this}); + _rerunGraph({this}); + }); } void _react(void Function(T) onData, Function? onError) { // Only routers can be .react-ed to assert(_dss != null); final caller = GlobalCtx.currentComputation; - if (GlobalCtx._reacting) { + final updateCtx = GlobalCtx._updateCtx!; + if (updateCtx._reacting) { throw StateError(_noReactivityInsideReact); } // Make sure the caller is subscribed - GlobalCtx._currentUpdateUpstreamComputations[caller]![this] = + updateCtx._currentUpdateUpstreamComputations[caller]![this] = _WeakMemoizedValueOrException(false, false, _lastResult); if (_dss!._lastEmit != GlobalCtx._currentUpdate) { @@ -740,7 +779,7 @@ final class RouterImpl extends ComputedImpl { return; } - GlobalCtx._reacting = true; + updateCtx._reacting = true; try { switch (_lastResult!) { case Value(value: final value): @@ -756,7 +795,7 @@ final class RouterImpl extends ComputedImpl { } } } finally { - GlobalCtx._reacting = false; + updateCtx._reacting = false; } } diff --git a/packages/computed/lib/src/computed_stream.dart b/packages/computed/lib/src/computed_stream.dart index bed444c..e63b9cd 100644 --- a/packages/computed/lib/src/computed_stream.dart +++ b/packages/computed/lib/src/computed_stream.dart @@ -23,7 +23,10 @@ class ComputedStreamExtensionImpl { // No onPause and onResume, as Computed doesn't support these. } - void _onListen() { + void _onListen() async { + // StreamController can call onListen synchronously, + // so call .listen in a separate microtask. + await Future.value(); _computedSubscription ??= _parent.listen((event) => _controller!.add(event), (error) => _controller!.addError(error)); } diff --git a/packages/computed/test/computed_test.dart b/packages/computed/test/computed_test.dart index 353eb27..b69b10d 100644 --- a/packages/computed/test/computed_test.dart +++ b/packages/computed/test/computed_test.dart @@ -1438,7 +1438,10 @@ void main() { }, (e) { expect(flag, false); flag = true; - expect(e, isA()); + expect( + e, + isA().having((e) => e.message, 'message', + "`listen` is not allowed inside computations.")); }); await Future.value(); diff --git a/packages/computed/test/delayed_reeval_test.dart b/packages/computed/test/delayed_reeval_test.dart index f3de448..91c018e 100644 --- a/packages/computed/test/delayed_reeval_test.dart +++ b/packages/computed/test/delayed_reeval_test.dart @@ -45,16 +45,21 @@ void main() { expect(odcCnt, 1); expect(lCnt, 0); c.reeval(); - expect(odcCnt, 1); + expect(odcCnt, 1); // Because it gained a listener for the first time + expect(lCnt, 0); // As the ValueStream has not notified Computed yet + await Future.value(); + expect(odcCnt, 2); // Because the ValueStream now has a value + expect(lCnt, 0); // We haven't called reeval yet + c.reeval(); expect(lCnt, 1); expect(lastEvent, 0); s.add(0); - expect(odcCnt, 1); - s.add(1); expect(odcCnt, 2); + s.add(1); + expect(odcCnt, 3); expect(lCnt, 1); c.reeval(); - expect(odcCnt, 2); + expect(odcCnt, 3); expect(lCnt, 2); expect(lastEvent, 1);