-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathindex.ts
97 lines (91 loc) · 2.19 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/** List node */
type Node<T> = {
/** input promise wrapper */
p: () => T
/** resolve returned promise */
res: (value: T) => void
/** reject returned promise */
rej: (reason: any) => void
/** next node pointer */
next?: Node<T>
}
/** Queue interface */
export interface Queue {
/** Add an async function / promise wrapper to the queue */
add<T>(promiseFunction: () => PromiseLike<T>): Promise<T>
/** Returns a promise that resolves when the queue is empty */
done(): Promise<void>
/** Empties the queue (active promises are not cancelled) */
clear(): void
/** Returns the number of promises currently running */
active(): number
/** Returns the total number of promises in the queue */
size(): number
}
// this just saves a few bytes
let Promize = Promise
/**
* Creates a new queue with the specified concurrency level.
*
* @param {number} concurrency - The maximum number of concurrent operations.
* @return {Queue} - The newly created queue.
*/
export let newQueue = (concurrency: number): Queue => {
let active = 0
let size = 0
let head: Node<PromiseLike<any>> | undefined | null
let tail: Node<PromiseLike<any>> | undefined | null
let resolveDonePromise: (value: void | PromiseLike<void>) => void
let donePromise: Promise<void> | void
let afterRun = () => {
active--
if (--size) {
run()
} else {
donePromise = resolveDonePromise?.()
}
}
let run = () => {
if (head && active < concurrency) {
active++
let curHead = head
head = head.next
curHead.p().then(
(v) => (curHead.res(v), afterRun()),
(e) => (curHead.rej(e), afterRun())
)
}
}
return {
add<T>(p: () => PromiseLike<T>) {
let node = { p } as Node<PromiseLike<T>>
let promise = new Promize((res, rej) => {
node.res = res
node.rej = rej
})
if (head) {
tail = tail!.next = node
} else {
tail = head = node
}
size++
run()
return promise as Promise<T>
},
done: () => {
if (!size) {
return Promize.resolve()
}
if (donePromise) {
return donePromise
}
return (donePromise = new Promize((resolve) => (resolveDonePromise = resolve)))
},
clear() {
head = tail = null
size = active
},
active: () => active,
size: () => size,
}
}