diff --git a/mangle.json b/mangle.json index fc1e8810f..d3a9cb1a4 100644 --- a/mangle.json +++ b/mangle.json @@ -33,7 +33,20 @@ "$_canActivate": "_c", "$_readonly": "_r", "$_requiresUpdate": "_q", - "$_props": "__" + "$_props": "__", + + "$_children": "_c", + "$_wakeup": "_w", + "$_first": "_i", + "$_dirty": "_d", + "$_hasError": "_h", + "$_weak": "_r", + "$_parents": "_p", + "$_callback": "_f", + "$_addDependency": "_a", + "$_removeDependencies": "_u", + "$_listener": "_l", + "$_listen": "_s" } } } diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 9b5dbe2d9..5f29b7739 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -1,305 +1,341 @@ -let ROOT: Signal; - -/** This tracks subscriptions of signals read inside a computed */ -let currentSignal: Signal; -let commitError: Error | null = null; - -const pending = new Set(); -/** Batch calls can be nested. 0 means that there is no batching */ -let batchPending = 0; -/** - * Subscriptions are set up lazily when a "reactor" is set up. - * During this activation phase we traverse the graph upwards - * and refresh all signals that are stale on signal read. - */ -let activating = false; - -let oldDeps = new Set(); - -export class Signal { - // These property names get minified - see /mangle.json - - /** @internal Internal, do not use. */ - _subs = new Set(); - /** @internal Internal, do not use. */ - _deps = new Set(); - /** @internal Internal, do not use. */ - _pending = 0; - /** @internal Internal, do not use. */ - _value: T; - /** @internal Determine if a computed is allowed to write or not */ - _readonly = false; - /** @internal Marks the signal as requiring an update */ - _requiresUpdate = false; - /** @internal Determine if reads should eagerly activate value */ - _canActivate = false; - /** @internal Used to detect if there is a cycle in the graph */ - _isComputing = false; - - constructor(value: T) { - this._value = value; - } - - toString() { - return "" + this.value; - } - - peek() { - if (currentSignal._canActivate && this._deps.size === 0) { - activate(this); - } - return this._value; - } - - get value() { - // If we read a signal outside of a computed we have no way - // to unsubscribe from that. So we assume that the user wants - // to get the value immediately like for testing. - if (currentSignal._canActivate && this._deps.size === 0) { - activate(this); - - // The ROOT signal cannot track dependencies as it's never - // subscribed to - if (currentSignal === ROOT) { - return this._value; - } - } - - // subscribe the current computed to this signal: - this._subs.add(currentSignal); - // update the current computed's dependencies: - currentSignal._deps.add(this); - oldDeps.delete(this); - - // refresh stale value when this signal is read from withing - // batching and when it has been marked already - if ( - (batchPending > 0 && this._pending > 0) || - // Set up subscriptions during activation phase - (activating && this._deps.size === 0) - ) { - refreshStale(this); - } - return this._value; - } - - set value(value) { - if (this._readonly) { - throw new Error("Computed signals are readonly"); - } - - if (this._value !== value) { - this._value = value; - let isFirst = pending.size === 0; - pending.add(this); - // in batch mode this signal may be marked already - if (this._pending === 0) { - mark(this); - } - - // this is the first change, not a computed and we are not - // in batch mode: - if (isFirst && batchPending === 0) { - sweep(pending); - pending.clear(); - if (commitError) { - const err = commitError; - // Clear global error flag for next commit - commitError = null; - throw err; - } - } - } - } - - /** - * Start a read operation where this signal is the "current signal" context. - * Returns a function that must be called to end the read context. - * @internal - */ - _setCurrent() { - let prevSignal = currentSignal; - let prevOldDeps = oldDeps; - currentSignal = this; - oldDeps = this._deps; - this._deps = new Set(); - - return (shouldUnmark: boolean, shouldCleanup: boolean) => { - if (shouldUnmark) this._subs.forEach(unmark); - - // Any leftover dependencies here are not needed anymore - if (shouldCleanup) { - // Unsubscribe from dependencies that were not accessed: - oldDeps.forEach(sub => unsubscribe(this, sub)); - } else { - // Re-subscribe to dependencies that not accessed: - oldDeps.forEach(sub => subscribe(this, sub)); - } - - oldDeps.clear(); - oldDeps = prevOldDeps; - currentSignal = prevSignal; - }; - } - - /** - * A custom update routine to run when this Signal's value changes. - * @internal - */ - _updater() { - // override me to handle updates - } +export interface ReadonlySignal { + peek(): T; + readonly value: T; } -function mark(signal: Signal) { - if (signal._pending++ === 0) { - signal._subs.forEach(mark); - } -} +export type Disposer = () => void; + +type Listener = () => void; + + +// The current computed that is running +let currentComputed: Computed | null = null; + +// A set of listeners which will be triggered after the batch is complete +let batchPending: Set | null = null; + +const processingSignals: Set> = new Set(); + + +export function batch(f: () => T): T { + if (batchPending === null) { + const listeners: Set = new Set(); + + const old = batchPending; + batchPending = listeners; + + try { + return f(); + + } finally { + batchPending = old; + processingSignals.clear(); -function unmark(signal: Signal) { - // We can only unmark this node as not needing an update if it - // wasn't flagged as needing an update by someone else. This is - // done to make the sweeping logic independent of the order - // in which a dependency tries to unmark a subtree. - if (!signal._requiresUpdate && --signal._pending === 0) { - signal._subs.forEach(unmark); - } + // Trigger any pending listeners + listeners.forEach((listener) => { + listener(); + }); + } + + // We're already inside of an outer batch + } else { + return f(); + } } -function sweep(subs: Set>) { - subs.forEach(signal => { - // If a computed errored during sweep, we'll discard that subtree - // for this sweep cycle by setting PENDING to 0; - if (signal._pending > 0) { - signal._requiresUpdate = true; - - if (--signal._pending === 0) { - if (signal._isComputing) { - throw new Error("Cycle detected"); - } - - signal._requiresUpdate = false; - signal._isComputing = true; - signal._updater(); - signal._isComputing = false; - sweep(signal._subs); - } - } - }); + +export class Signal { + // These property names get minified - see /mangle.json + + /** @internal */ + protected _value: T; + + constructor(value: T) { + this._value = value; + } + + public toString() { + return "" + this.value; + } + + /** + * This uses WeakRef in order to avoid memory leaks: if the child is not + * used anywhere then it can be garbage collected. + * + * @internal + */ + protected _children: Set>> = new Set(); + + /** + * Recurse down all children, marking them as dirty and adding + * listeners to batchPending. + * + * @internal + */ + protected _wakeup() { + this._children.forEach((childRef) => { + const child = childRef.deref(); + + if (child) { + child._wakeup(); + + // If the child has been garbage collected, then remove it from the Set + } else { + this._children.delete(childRef); + } + }); + } + + public peek(): T { + return this._value; + } + + public get value(): T { + const value = this._value; + + if (currentComputed !== null) { + // This is used to detect infinite cycles + if (batchPending !== null) { + processingSignals.add(this); + } + + // If accessing inside of a computed, add this to the computed's parents + currentComputed._addDependency(this, value); + } + + return value; + } + + public set value(value: T) { + if (currentComputed !== null && batchPending !== null && processingSignals.has(this)) { + throw new Error("Cycle detected"); + } + + this._value = value; + + // If the value is set outside of a batch, this ensures that all of the + // children will be fully marked as dirty before triggering any listeners + batch(() => { + this._wakeup(); + }); + } } -function subscribe(signal: Signal, to: Signal) { - signal._deps.add(to); - to._subs.add(signal); +export function signal(value: T): Signal { + return new Signal(value); } -function unsubscribe(signal: Signal, from: Signal) { - signal._deps.delete(from); - from._subs.delete(signal); - // If nobody listens to the signal we depended on, we can traverse - // upwards and destroy all subscriptions until we encounter a writable - // signal or a signal that others listen to as well. - if (from._subs.size === 0) { - from._deps.forEach(dep => unsubscribe(from, dep)); - } +class Computed extends Signal implements ReadonlySignal { + // These property names get minified - see /mangle.json + + /** + * Whether this is the first time processing this computed + * + * @internal + */ + protected _first: boolean = true; + + /** + * Whether any of the computed's parents have changed or not. + * + * @internal + */ + protected _dirty: boolean = true; + + /** + * Whether the callback errored or not. + * + * @internal + */ + protected _hasError: boolean = false; + + /** + * WeakRefs have their own object identity, so we must reuse + * the same WeakRef over and over again + * + * @internal + */ + protected _weak: WeakRef = new WeakRef(this); + + /** + * The parent dependencies for this computed. + * + * @internal + */ + protected _parents: Map, unknown> = new Map(); + + /** @internal */ + protected _callback: () => T; + + constructor(callback: () => T) { + super(undefined as unknown as T); + this._callback = callback; + } + + /** + * Mark this computed as dirty whenever any of its parents change. + * + * @internal + */ + protected _wakeup() { + this._dirty = true; + super._wakeup(); + } + + /** + * This is called when another Signal's .value is accessed inside of + * this computed, it adds the Signal as a dependency of this computed. + * + * @internal + */ + public _addDependency(parent: Signal, value: unknown) { + this._parents.set(parent, value); + + // This uses a WeakRef to avoid a memory leak + (parent as any)._children.add(this._weak); + } + + /** + * Removes all links between this computed and its dependencies. + * + * @internal + */ + protected _removeDependencies() { + this._parents.forEach((_value, parent) => { + (parent as any)._children.delete(this._weak); + }); + + this._parents.clear(); + } + + public peek(): T { + if (this._dirty) { + this._dirty = false; + + try { + let changed = false; + + if (this._first) { + this._first = false; + changed = true; + + } else { + // This checks if at least one of its parents has a different value + this._parents.forEach((oldValue, parent) => { + const newValue = parent.peek(); + + if (oldValue !== newValue) { + changed = true; + } + }); + } + + if (changed) { + this._hasError = false; + + // Because the dependencies might have changed, we first + // remove all of the old links between this computed and + // its dependencies. + // + // The links will be recreated by the _addDependency method. + this._removeDependencies(); + + const old = currentComputed; + currentComputed = this; + + try { + this._value = this._callback(); + + } finally { + currentComputed = old; + } + } + + } catch (e) { + this._hasError = true; + + // We reuse the _value slot for the error, instead of using a separate property + this._value = e as T; + } + } + + if (this._hasError) { + throw this._value; + + } else { + return this._value; + } + } + + public get value(): T { + const value = this.peek(); + + if (currentComputed !== null) { + // If accessing inside of a computed, add this to the computed's parents + currentComputed._addDependency(this, value); + } + + return value; + } + + public set value(v: T) { + throw new Error("Computed signals are readonly"); + } } -const tmpPending: Signal[] = []; -/** - * Refresh _just_ this signal and its dependencies recursively. - * All other signals will be left untouched and added to the - * global queue to flush later. Since we're traversing "upwards", - * we don't have to car about topological sorting. - */ -function refreshStale(signal: Signal) { - pending.delete(signal); - signal._pending = 0; - signal._updater(); - if (commitError) { - const err = commitError; - commitError = null; - throw err; - } - - signal._subs.forEach(sub => { - if (sub._pending > 0) { - // If PENDING > 1 then we can safely reduce the counter because - // the final sweep will take care of the rest. But if it's - // exactly 1 we can't do that otherwise the sweeping logic - // assumes that this signal was already updated. - if (sub._pending > 1) sub._pending--; - tmpPending.push(sub); - } - }); +export function computed(f: () => T): ReadonlySignal { + return new Computed(f); } -function activate(signal: Signal) { - activating = true; - try { - refreshStale(signal); - } finally { - activating = false; - } -} -ROOT = currentSignal = new Signal(undefined); -ROOT._canActivate = true; +class Effect extends Computed implements ReadonlySignal { + // These property names get minified - see /mangle.json -export function signal(value: T): Signal { - return new Signal(value); -} + /** @internal */ + protected _listener: Listener | null = null; -export type ReadonlySignal = Omit, "value"> & { - readonly value: T; -}; -export function computed(compute: () => T): ReadonlySignal { - const signal = new Signal(undefined as any); - signal._readonly = true; + constructor(callback: () => T) { + super(callback); + } - function updater() { - let finish = signal._setCurrent(); + /** @internal */ + protected _wakeup() { + if (batchPending === null) { + throw new Error("Invalid batchPending"); + } - try { - let ret = compute(); + if (this._listener !== null) { + batchPending!.add(this._listener); + } - finish(signal._value === ret, true); - signal._value = ret; - } catch (err: any) { - // Ensure that we log the first error not the last - if (!commitError) commitError = err; - finish(true, false); - } - } + super._wakeup(); + } - signal._updater = updater; + /** @internal */ + public _listen(callback: (value: T) => void): Disposer { + let oldValue = this.value; - return signal; -} + const listener = () => { + const newValue = this.value; + + if (oldValue !== newValue) { + oldValue = newValue; + callback(oldValue); + } + }; + + this._listener = listener; + + callback(oldValue); -export function effect(callback: () => void) { - const s = computed(() => batch(callback)); - // Set up subscriptions since this is a "reactor" signal - activate(s); - return () => s._setCurrent()(true, true); + return () => { + this._listener = null; + this._removeDependencies(); + }; + } } -export function batch(cb: () => T): T { - batchPending++; - try { - return cb(); - } finally { - // Since stale signals are refreshed upwards, we need to - // add pending signals in reverse - let item: Signal | undefined; - while ((item = tmpPending.pop()) !== undefined) { - pending.add(item); - } - - if (--batchPending === 0) { - sweep(pending); - pending.clear(); - } - } +export function effect(callback: () => void): Disposer { + return new Effect(() => batch(callback))._listen(() => {}); } diff --git a/packages/core/test/signal.test.ts b/packages/core/test/signal.test.ts index 68e633f2a..377009c0a 100644 --- a/packages/core/test/signal.test.ts +++ b/packages/core/test/signal.test.ts @@ -204,6 +204,7 @@ describe("computed()", () => { compute.resetHistory(); a.value = "aa"; + expect(c.value).to.equal("aab"); expect(compute).to.have.been.calledOnce; }); @@ -224,7 +225,7 @@ describe("computed()", () => { compute.resetHistory(); a.value = 4; - + e.value; expect(compute).to.have.been.calledOnce; }); @@ -440,6 +441,7 @@ describe("computed()", () => { spy.resetHistory(); a.value = "aa"; + d.value; expect(spy).to.returned("aa c"); }); @@ -468,6 +470,7 @@ describe("computed()", () => { spy.resetHistory(); a.value = "aa"; + e.value; expect(spy).to.returned("aa c d"); }); }); @@ -494,27 +497,46 @@ describe("computed()", () => { it("should keep graph consistent on errors in computeds", () => { const a = signal(0); - let shouldThrow = false; + const shouldThrow = signal(false); + const b = computed(() => { - if (shouldThrow) throw new Error("fail"); + if (shouldThrow.value) throw new Error("fail"); return a.value; }); + const c = computed(() => b.value); expect(c.value).to.equal(0); - shouldThrow = true; + // Update it so that c will now throw + batch(() => { + shouldThrow.value = true; + a.value = 1; + }); + + // Verify that c throws let error: Error | null = null; try { - a.value = 1; + c.value; } catch (err: any) { error = err; } expect(error?.message).to.equal("fail"); - // Now update signal again without throwing an error. If we didn't - // reset the subtree's PENDING counter C's value wouldn't update. - shouldThrow = false; - a.value = 2; + // Make sure that c always rethrows whenever accessing its value + error = null; + try { + c.value; + } catch (err: any) { + error = err; + } + expect(error?.message).to.equal("fail"); + + // Update it so that c no longer throws + batch(() => { + shouldThrow.value = false; + a.value = 2; + }); + expect(c.value).to.equal(2); }); @@ -522,9 +544,10 @@ describe("computed()", () => { const a = signal(1); const b = signal(1); const c = signal(1); - let shouldThrow = false; + const shouldThrow = signal(false); + const compute = sinon.spy(() => { - if (shouldThrow) { + if (shouldThrow.value) { throw new Error("fail: " + c.value); } return a.value + b.value; @@ -532,30 +555,34 @@ describe("computed()", () => { const d = computed(compute); expect(d.value).to.equal(2); - shouldThrow = true; - expect(() => { + batch(() => { + shouldThrow.value = true; a.value = 2; + }); + + expect(() => { + d.value; }).to.throw(); - expect(d.value).to.equal(2); - // when errors occur, we intentionally over-subscribe. - // This includes retaining subscriptions after the error: + expect(compute).to.have.been.called; + compute.resetHistory(); - try { + + expect(() => { b.value = 2; - } catch (e) { - // may error, but not in a way we can assert over - } - expect(compute).to.have.been.called; + d.value; + }).to.throw(); + + expect(compute).to.have.not.been.called; compute.resetHistory(); - shouldThrow = false; - // Note: b.value=2 should probably also update the subgraph. - // ...but its value is already 2 from the errored computation. - // b.value = 2; - c.value = 2; - expect(compute).to.have.been.called; + + batch(() => { + shouldThrow.value = false; + c.value = 2; + }); expect(d.value).to.equal(4); + expect(compute).to.have.been.called; }); }); }); diff --git a/tsconfig.json b/tsconfig.json index 2fe4efcc5..becb19813 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,6 +1,6 @@ { "compilerOptions": { - "target": "ES2020", + "target": "ES2021", "moduleResolution": "node", "esModuleInterop": true, "strict": true,