-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathhub.js
102 lines (89 loc) · 2.77 KB
/
hub.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
"use strict";
var reduce = require("reducible/reduce")
var reduced = require("reducible/reduced")
var isReduced = require("reducible/is-reduced")
var end = require("reducible/end")
var input = "input@" + module.id
var consumers = "consumers@" + module.id
var isArray = Array.isArray
function Hub(source) {
this[input] = source
this[consumers] = []
}
reduce.define(Hub, function reduceHub(hub, next, initial) {
// Enqueue new consumer into consumers array so that new
// values will be delegated to it.
hub[consumers].push({ next: next, state: initial })
// If source is not in the process of consumption than
// start it up.
if (!isOpen(hub)) open(hub)
})
function drain(consumers) {
while (consumers.length) {
var count = consumers.length
var index = 0
while (index < count) {
var consumer = consumers[index]
consumer.next(end, consumer.state)
index = index + 1
}
consumers.splice(0, count)
}
}
function dispatch(consumers, value) {
var count = consumers.length
var index = 0
while (index < count) {
var consumer = consumers[index]
var state = consumer.next(value, consumer.state)
// If consumer has finished accumulation remove it from the consumers
// list. And dispatch end of stream on it (maybe that should not be
// necessary).
if (isReduced(state)) {
consumers.splice(index, 1)
consumer.next(end, state.value)
// If consumer is removed than we decrease count as consumers array
// will contain less elements (unless of course more elements were
// added but we would like to ignore those).
count = count - 1
} else {
consumer.state = state
index = index + 1
}
}
}
function open(hub) {
var source = hub[input]
var reducers = hub[consumers]
hub[input] = null // mark hub as open
reduce(source, function distribute(value) {
// If it's end of the source we close all the reducers including
// ones that subscribe as side effect.
if (value === end) drain(reducers)
// otherwise we dispatch value to all the registered reducers.
else dispatch(reducers, value)
// reducers will be empty if either source is drained or if all the
// reducers finished reductions. Either way we reset input back to
// source and return `reduced` marker to stop the reduction of
// source.
if (reducers.length === 0) {
hub[input] = source
return reduced()
}
})
}
function isOpen(hub) {
return hub[input] === null
}
function hub(source) {
/**
Take a reducible `source`, such as a `signal` and return a reducible that can
be consumed by many reducers.
**/
if (source === null) return null
if (source === void(0)) return void(0)
return new Hub(source)
}
hub.isOpen = isOpen
hub.type = Hub
module.exports = hub