Skip to content

Commit

Permalink
refactor: consolidate node scheduling logic
Browse files Browse the repository at this point in the history
@affects atoms, immer, machines, stores
  • Loading branch information
bowheart committed Feb 12, 2025
1 parent f82b5e4 commit 90208de
Show file tree
Hide file tree
Showing 13 changed files with 75 additions and 116 deletions.
8 changes: 8 additions & 0 deletions packages/atoms/src/classes/Ecosystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,14 @@ export class Ecosystem<Context extends Record<string, any> | undefined = any>
this._refCount++
}

/**
* `a`ddJob - add a job to the scheduler without scheduling. The scheduler
* should be already either running or scheduled when calling this.
*/
public a(job: Job) {
this._scheduler.insertJob(job)
}

/**
* @see Job.j
*/
Expand Down
34 changes: 16 additions & 18 deletions packages/atoms/src/classes/GraphNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,19 @@ export abstract class GraphNode<G extends NodeGenerics = AnyNodeGenerics>
public abstract p: G['Params']

/**
* `r`un - evaluate the graph node. If its value "changes", this in turn runs
* this node's dependents, recursing down the graph tree.
*
* If `defer` is true, make the scheduler set a timeout to run the evaluation.
*/
public abstract r(reason: InternalEvaluationReason, defer?: boolean): void
* `r`un - schedule an evaluation of the graph node. If its value "changes",
* this in turn schedules this node's observers, recursing down the graph
* tree.
*/
public r(reason: InternalEvaluationReason) {
// don't schedule if destroyed and ignore `EventSent` reasons TODO: Any
// calls when destroyed probably indicate a memory leak on the user's part.
// Notify them. TODO: Can we pause evaluations while status is Stale (and
// should we just always evaluate once when waking up a stale node)?
this.l === DESTROYED ||
reason.t === EventSent ||
(this.w.push(reason) === 1 && this.e.a(this))
}

/**
* `s`ources - a map of the edges drawn between this node and all of its
Expand Down Expand Up @@ -511,15 +518,6 @@ export class ExternalNode<
this.destroy()
}

/**
* @see GraphNode.r
*/
public r(reason: InternalEvaluationReason, defer?: boolean) {
// ignore `EventSent` reasons
reason.t === EventSent ||
(this.w.push(reason) === 1 && this.e._scheduler.schedule(this, defer))
}

/**
* `u`pdateEdge - ExternalNodes maintain a single edge on a source node. But
* the source can change. Call this to update it if needed.
Expand Down Expand Up @@ -621,14 +619,14 @@ export class Listener<
/**
* @see ExternalNode.r
*/
public r(reason: InternalEvaluationReason, defer?: boolean) {
public r(reason: InternalEvaluationReason) {
const { e, w } = this
const shouldSchedule = shouldScheduleImplicit(this, reason)

// schedule the job if needed. If not scheduling, kill this listener now if
// its source is destroyed.
shouldSchedule
? w.push(reason) === 1 && e._scheduler.schedule(this, defer)
shouldSchedule && this.l !== DESTROYED
? w.push(reason) === 1 && e.a(this)
: this.i?.l === DESTROYED && this.k(this.i)
}
}
6 changes: 2 additions & 4 deletions packages/atoms/src/classes/MappedSignal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,9 @@ export class MappedSignal<
/**
* @see Signal.r
*/
public r(reason: InternalEvaluationReason, defer?: boolean) {
public r(reason: InternalEvaluationReason) {
if (reason.t !== EventSent) {
if (this.w.push(reason) === 1) {
this.e._scheduler.schedule(this, defer)
}
super.r(reason)

if (reason.s) {
this.N ??= { ...this.v }
Expand Down
48 changes: 24 additions & 24 deletions packages/atoms/src/classes/Scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,28 @@ export class Scheduler implements SchedulerInterface {
this.runJobs()
}

/**
* Schedule an EvaluateGraphNode (2) or UpdateExternalDependent (3) job
*/
public insertJob(newJob: Job) {
const weight = newJob.W ?? 0

const index = this.findIndex(job => {
if (job.T !== newJob.T) return +(newJob.T - job.T > 0) || -1 // 1 or -1

// EvaluateGraphNode (2) and UpdateExternalDependent (3) jobs use weight
// comparison. `W` will always be defined here. TODO: use discriminated
// union types to reflect this
return weight < job.W! ? -1 : +(weight > job.W!) // + = 0 or 1
})

if (index === -1) {
this.jobs.push(newJob)
} else {
this.jobs.splice(index, 0, newJob)
}
}

/**
* Call after any operation that may have nested flush attempts. This in
* itself _is_ a flush attempt so whatever calls may also need wrapping in
Expand Down Expand Up @@ -115,11 +137,11 @@ export class Scheduler implements SchedulerInterface {
* IMPORTANT: Setting and clearing timeouts is expensive. We need to always
* pass `shouldSetTimeout: false` when we're going to immediately flush
*/
public schedule(newJob: Job, shouldSetTimeout = true) {
public schedule(newJob: Job) {
this.insertJob(newJob)

// we just pushed the first job onto the queue
if (shouldSetTimeout && this.jobs.length === 1 && !this._isRunning) {
if (this.jobs.length === 1 && !this._isRunning) {
this.setTimeout()
}
}
Expand Down Expand Up @@ -202,28 +224,6 @@ export class Scheduler implements SchedulerInterface {
return this.findIndex(cb, newIndex, iteration + 1)
}

/**
* Schedule an EvaluateGraphNode (2) or UpdateExternalDependent (3) job
*/
private insertJob(newJob: Job) {
const weight = newJob.W ?? 0

const index = this.findIndex(job => {
if (job.T !== newJob.T) return +(newJob.T - job.T > 0) || -1 // 1 or -1

// EvaluateGraphNode (2) and UpdateExternalDependent (3) jobs use weight
// comparison. `W` will always be defined here. TODO: use discriminated
// union types to reflect this
return weight < job.W! ? -1 : +(weight > job.W!) // + = 0 or 1
})

if (index === -1) {
this.jobs.push(newJob)
} else {
this.jobs.splice(index, 0, newJob)
}
}

/**
* Run either all "full" jobs or all "now" jobs. Since the jobs are split, we
* can essentially have two schedulers running at once. "Now" jobs must always
Expand Down
8 changes: 0 additions & 8 deletions packages/atoms/src/classes/SelectorInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
AtomSelectorConfig,
AtomSelectorOrConfig,
DehydrationFilter,
InternalEvaluationReason,
SelectorGenerics,
} from '../types/index'
import {
Expand Down Expand Up @@ -191,11 +190,4 @@ export class SelectorInstance<
public m() {
this.destroy()
}

/**
* @see GraphNode.r
*/
public r(reason: InternalEvaluationReason, defer?: boolean) {
this.w.push(reason) === 1 && this.e._scheduler.schedule(this, defer)
}
}
7 changes: 0 additions & 7 deletions packages/atoms/src/classes/Signal.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Settable } from '@zedux/core'
import {
AtomGenerics,
InternalEvaluationReason,
Mutatable,
NodeGenerics,
SendableEvents,
Expand Down Expand Up @@ -263,10 +262,4 @@ export class Signal<
public m() {
this.destroy()
}

/**
* @see GraphNode.r
*/
// eslint-disable-next-line @typescript-eslint/no-unused-vars
public r(reason: InternalEvaluationReason, defer?: boolean) {}
}
14 changes: 3 additions & 11 deletions packages/atoms/src/classes/instances/AtomInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import {
} from '@zedux/atoms/types/index'
import {
ACTIVE,
DESTROYED,
ERROR,
EventSent,
INITIALIZING,
Expand Down Expand Up @@ -284,7 +283,7 @@ export class AtomInstance<
*/
public invalidate() {
const reason = { s: this, t: Invalidate } as const
this.r(reason, false)
this.r(reason)

if (isListeningTo(this.e, INVALIDATE)) {
sendEcosystemEvent(this.e, { source: this, type: INVALIDATE })
Expand Down Expand Up @@ -549,7 +548,7 @@ export class AtomInstance<
/**
* @see Signal.r
*/
public r(reason: InternalEvaluationReason, defer?: boolean) {
public r(reason: InternalEvaluationReason) {
if (reason.t === EventSent) {
// forward events from `this.S`ignal to observers of this atom instance.
// Ignore events from other sources (shouldn't happen, but either way
Expand All @@ -561,14 +560,7 @@ export class AtomInstance<
return
}

// TODO: Any calls in this case probably indicate a memory leak on the
// user's part. Notify them. TODO: Can we pause evaluations while
// status is Stale (and should we just always evaluate once when
// waking up a stale atom)?
if (this.l !== DESTROYED && this.w.push(reason) === 1) {
// refCount just hit 1; we haven't scheduled a job for this node yet
this.e._scheduler.schedule(this, defer)
}
super.r(reason)

if (reason.s && reason.s === this.S && reason.e) {
// when `this.S`ignal gives us events along with a state update, subsume
Expand Down
26 changes: 13 additions & 13 deletions packages/atoms/src/utils/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,6 @@ import {
} from '../types'
import { ERROR, EventSent, makeReasonReadable } from './general'

export const shouldScheduleImplicit = (
node: { C: Record<string, number> },
reason: InternalEvaluationReason
) => {
reason.f || (reason.t !== EventSent && makeReasonReadable(reason))

return (
// '' = the catch-all listener
node.C[''] ||
Object.keys(reason.f ?? reason.e ?? {}).some(key => node.C[key])
)
}

export const isListeningTo = (
ecosystem: Ecosystem,
eventName: keyof EcosystemEvents
Expand Down Expand Up @@ -64,6 +51,19 @@ export const sendImplicitEcosystemEvent = (
ecosystem._scheduler.schedule(ecosystem)
}

export const shouldScheduleImplicit = (
node: { C: Record<string, number> },
reason: InternalEvaluationReason
) => {
reason.f || (reason.t !== EventSent && makeReasonReadable(reason))

return (
// '' = the catch-all listener
node.C[''] ||
Object.keys(reason.f ?? reason.e ?? {}).some(key => node.C[key])
)
}

export const parseOnArgs = (
eventNameOrCallback: PropertyKey | ((eventMap: any) => void),
callbackOrConfig?: SingleEventListener<any, any> | ListenerConfig,
Expand Down
6 changes: 3 additions & 3 deletions packages/atoms/src/utils/graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ export const scheduleDependents = (
}
) => {
for (const [observer, edge] of reason.s.o) {
edge.flags & Static || observer.r(reason, false)
edge.flags & Static || observer.r(reason)
}
}

Expand All @@ -212,7 +212,7 @@ export const scheduleEventListeners = (
const pre = reason.s.e._scheduler.pre()

for (const [observer, edge] of reason.s.o) {
edge.flags & Eventless || observer.r(reason, false)
edge.flags & Eventless || observer.r(reason)
}

reason.s.e._scheduler.post(pre)
Expand All @@ -230,7 +230,7 @@ export const scheduleStaticDependents = (
const pre = reason.s.e._scheduler.pre()

for (const observer of reason.s.o.keys()) {
observer.r(reason, false)
observer.r(reason)
}

reason.s.e._scheduler.post(pre)
Expand Down
2 changes: 1 addition & 1 deletion packages/immer/src/injectImmerStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const doSubscribe = <State>(
return
}

instance.r({ o: oldState }, false)
instance.r({ o: oldState })

// run the scheduler synchronously after any store update
if (action?.meta !== zeduxTypes.batch) {
Expand Down
2 changes: 1 addition & 1 deletion packages/machines/src/injectMachineStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ export const injectMachineStore: <
return
}

instance.r({ o: oldState }, false)
instance.r({ o: oldState })

// run the scheduler synchronously after any store update
if (action?.meta !== zeduxTypes.batch) {
Expand Down
25 changes: 0 additions & 25 deletions packages/stores/src/AtomInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ import {
import {
AtomInstance as NewAtomInstance,
Ecosystem,
ExportsInfusedSetter,
PromiseState,
InternalEvaluationReason,
Transaction,
zi,
SendableEvents,
Expand Down Expand Up @@ -304,29 +302,6 @@ export class AtomInstance<
}
}

/**
* @see NewAtomInstance.r
*/
public r(reason: InternalEvaluationReason, shouldSetTimeout?: boolean) {
// TODO: Any calls in this case probably indicate a memory leak on the
// user's part. Notify them. TODO: Can we pause evaluations while
// status is Stale (and should we just always evaluate once when
// waking up a stale atom)?
if (this.l !== zi.D && this.w.push(reason) === 1) {
// refCount just hit 1; we haven't scheduled a job for this node yet
this.e._scheduler.schedule(this, shouldSetTimeout)
}
}

public _set?: ExportsInfusedSetter<G['State'], G['Exports']>
public get _infusedSetter() {
if (this._set) return this._set
const setState: any = (settable: any, meta?: any) =>
this.setState(settable, meta)

return (this._set = Object.assign(setState, this.exports))
}

/**
* A standard atom's value can be one of:
*
Expand Down
5 changes: 4 additions & 1 deletion packages/stores/src/injectStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ export const doSubscribe = <State>(

const isBatch = action?.meta === zeduxTypes.batch

instance.r({ n, o }, isBatch)
instance.w.push({ n, o }) === 1 &&
(isBatch
? instance.e._scheduler.schedule(instance)
: instance.e.a(instance))

// run the scheduler synchronously after every store update unless batching
if (!isBatch) {
Expand Down

0 comments on commit 90208de

Please # to comment.