Skip to content

Commit 19c293b

Browse files
committed
add Stream.asyncPush api (#3277)
1 parent f1fc59e commit 19c293b

File tree

9 files changed

+345
-24
lines changed

9 files changed

+345
-24
lines changed

.changeset/forty-beers-refuse.md

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
---
2+
"effect": minor
3+
---
4+
5+
add Stream.asyncPush api
6+
7+
This api creates a stream from an external push-based resource.
8+
9+
You can use the `emit` helper to emit values to the stream. You can also use
10+
the `emit` helper to signal the end of the stream by using apis such as
11+
`emit.end` or `emit.fail`.
12+
13+
By default it uses an "unbounded" buffer size.
14+
You can customize the buffer size and strategy by passing an object as the
15+
second argument with the `bufferSize` and `strategy` fields.
16+
17+
```ts
18+
import { Effect, Stream } from "effect";
19+
20+
Stream.asyncPush<string>(
21+
(emit) =>
22+
Effect.acquireRelease(
23+
Effect.gen(function* () {
24+
yield* Effect.log("subscribing");
25+
return setInterval(() => emit.single("tick"), 1000);
26+
}),
27+
(handle) =>
28+
Effect.gen(function* () {
29+
yield* Effect.log("unsubscribing");
30+
clearInterval(handle);
31+
}),
32+
),
33+
{ bufferSize: 16, strategy: "dropping" },
34+
);
35+
```

.changeset/tricky-cheetahs-help.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"effect": minor
3+
---
4+
5+
add `bufferSize` option to Stream.fromEventListener

packages/effect/src/Stream.ts

+41
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,46 @@ export const asyncEffect: <A, E = never, R = never>(
373373
} | undefined
374374
) => Stream<A, E, R> = internal.asyncEffect
375375

376+
/**
377+
* Creates a stream from an external push-based resource.
378+
*
379+
* You can use the `emit` helper to emit values to the stream. The `emit` helper
380+
* returns a boolean indicating whether the value was emitted or not.
381+
*
382+
* You can also use the `emit` helper to signal the end of the stream by
383+
* using apis such as `emit.end` or `emit.fail`.
384+
*
385+
* By default it uses an "unbounded" buffer size.
386+
* You can customize the buffer size and strategy by passing an object as the
387+
* second argument with the `bufferSize` and `strategy` fields.
388+
*
389+
* @example
390+
* import { Effect, Stream } from "effect"
391+
*
392+
* Stream.asyncPush<string>((emit) =>
393+
* Effect.acquireRelease(
394+
* Effect.gen(function*() {
395+
* yield* Effect.log("subscribing")
396+
* return setInterval(() => emit.single("tick"), 1000)
397+
* }),
398+
* (handle) =>
399+
* Effect.gen(function*() {
400+
* yield* Effect.log("unsubscribing")
401+
* clearInterval(handle)
402+
* })
403+
* ), { bufferSize: 16, strategy: "dropping" })
404+
*
405+
* @since 3.6.0
406+
* @category constructors
407+
*/
408+
export const asyncPush: <A, E = never, R = never>(
409+
register: (emit: Emit.EmitOpsPush<E, A>) => Effect.Effect<unknown, never, R | Scope.Scope>,
410+
options?: { readonly bufferSize: "unbounded" } | {
411+
readonly bufferSize?: number | undefined
412+
readonly strategy?: "dropping" | "sliding" | undefined
413+
} | undefined
414+
) => Stream<A, E, Exclude<R, Scope.Scope>> = internal.asyncPush
415+
376416
/**
377417
* Creates a stream from an asynchronous callback that can be called multiple
378418
* times. The registration of the callback itself returns an a scoped
@@ -5955,5 +5995,6 @@ export const fromEventListener: <A = unknown>(
59555995
readonly capture?: boolean
59565996
readonly passive?: boolean
59575997
readonly once?: boolean
5998+
readonly bufferSize?: number | "unbounded" | undefined
59585999
} | undefined
59596000
) => Stream<A> = internal.fromEventListener

packages/effect/src/StreamEmit.ts

+53
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,56 @@ export interface EmitOps<in R, in E, in A, out B> {
8181
*/
8282
single(value: A): Promise<B>
8383
}
84+
85+
/**
86+
* @since 3.6.0
87+
* @category models
88+
*/
89+
export interface EmitOpsPush<in E, in A> {
90+
/**
91+
* Emits a chunk containing the specified values.
92+
*/
93+
chunk(chunk: Chunk.Chunk<A>): boolean
94+
95+
/**
96+
* Emits a chunk containing the specified values.
97+
*/
98+
array(chunk: ReadonlyArray<A>): boolean
99+
100+
/**
101+
* Terminates with a cause that dies with the specified defect.
102+
*/
103+
die<Err>(defect: Err): void
104+
105+
/**
106+
* Terminates with a cause that dies with a `Throwable` with the specified
107+
* message.
108+
*/
109+
dieMessage(message: string): void
110+
111+
/**
112+
* Either emits the specified value if this `Exit` is a `Success` or else
113+
* terminates with the specified cause if this `Exit` is a `Failure`.
114+
*/
115+
done(exit: Exit.Exit<A, E>): void
116+
117+
/**
118+
* Terminates with an end of stream signal.
119+
*/
120+
end(): void
121+
122+
/**
123+
* Terminates with the specified error.
124+
*/
125+
fail(error: E): void
126+
127+
/**
128+
* Terminates the stream with the specified cause.
129+
*/
130+
halt(cause: Cause.Cause<E>): void
131+
132+
/**
133+
* Emits a chunk containing the specified value.
134+
*/
135+
single(value: A): boolean
136+
}

packages/effect/src/internal/stream.ts

+52-18
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import * as Either from "../Either.js"
1010
import * as Equal from "../Equal.js"
1111
import * as Exit from "../Exit.js"
1212
import * as Fiber from "../Fiber.js"
13+
import * as FiberRef from "../FiberRef.js"
1314
import type { LazyArg } from "../Function.js"
1415
import { constTrue, dual, identity, pipe } from "../Function.js"
1516
import * as Layer from "../Layer.js"
@@ -597,6 +598,51 @@ export const asyncEffect = <A, E = never, R = never>(
597598
fromChannel
598599
)
599600

601+
const queueFromBufferOptionsPush = <A, E>(
602+
options?: { readonly bufferSize: "unbounded" } | {
603+
readonly bufferSize?: number | undefined
604+
readonly strategy?: "dropping" | "sliding" | undefined
605+
} | undefined
606+
): Effect.Effect<Queue.Queue<Array<A> | Exit.Exit<void, E>>> => {
607+
if (options?.bufferSize === "unbounded" || (options?.bufferSize === undefined && options?.strategy === undefined)) {
608+
return Queue.unbounded()
609+
}
610+
switch (options?.strategy) {
611+
case "sliding":
612+
return Queue.sliding(options.bufferSize ?? 16)
613+
default:
614+
return Queue.dropping(options?.bufferSize ?? 16)
615+
}
616+
}
617+
618+
/** @internal */
619+
export const asyncPush = <A, E = never, R = never>(
620+
register: (emit: Emit.EmitOpsPush<E, A>) => Effect.Effect<unknown, never, R | Scope.Scope>,
621+
options?: {
622+
readonly bufferSize: "unbounded"
623+
} | {
624+
readonly bufferSize?: number | undefined
625+
readonly strategy?: "dropping" | "sliding" | undefined
626+
} | undefined
627+
): Stream.Stream<A, E, Exclude<R, Scope.Scope>> =>
628+
Effect.acquireRelease(
629+
queueFromBufferOptionsPush<A, E>(options),
630+
Queue.shutdown
631+
).pipe(
632+
Effect.tap((queue) =>
633+
FiberRef.getWith(FiberRef.currentScheduler, (scheduler) => register(emit.makePush(queue, scheduler)))
634+
),
635+
Effect.map((queue) => {
636+
const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E> = core.flatMap(Queue.take(queue), (item) =>
637+
Exit.isExit(item)
638+
? Exit.isSuccess(item) ? core.void : core.failCause(item.cause)
639+
: channel.zipRight(core.write(Chunk.unsafeFromArray(item)), loop))
640+
return loop
641+
}),
642+
channel.unwrapScoped,
643+
fromChannel
644+
)
645+
600646
/** @internal */
601647
export const asyncScoped = <A, E = never, R = never>(
602648
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R | Scope.Scope>,
@@ -8341,23 +8387,11 @@ export const fromEventListener = <A = unknown>(
83418387
readonly capture?: boolean
83428388
readonly passive?: boolean
83438389
readonly once?: boolean
8390+
readonly bufferSize?: number | "unbounded" | undefined
83448391
} | undefined
83458392
): Stream.Stream<A> =>
8346-
_async<A>((emit) => {
8347-
let batch: Array<A> = []
8348-
let taskRunning = false
8349-
function cb(e: A) {
8350-
batch.push(e)
8351-
if (!taskRunning) {
8352-
taskRunning = true
8353-
queueMicrotask(() => {
8354-
const events = batch
8355-
batch = []
8356-
taskRunning = false
8357-
emit.chunk(Chunk.unsafeFromArray(events))
8358-
})
8359-
}
8360-
}
8361-
target.addEventListener(type, cb as any, options)
8362-
return Effect.sync(() => target.removeEventListener(type, cb, options))
8363-
}, "unbounded")
8393+
asyncPush<A>((emit) =>
8394+
Effect.acquireRelease(
8395+
Effect.sync(() => target.addEventListener(type, emit.single as any, options)),
8396+
() => Effect.sync(() => target.removeEventListener(type, emit.single, options))
8397+
), { bufferSize: typeof options === "object" ? options.bufferSize : undefined })

packages/effect/src/internal/stream/emit.ts

+77
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import * as Effect from "../../Effect.js"
44
import * as Exit from "../../Exit.js"
55
import { pipe } from "../../Function.js"
66
import * as Option from "../../Option.js"
7+
import type * as Queue from "../../Queue.js"
8+
import type * as Scheduler from "../../Scheduler.js"
79
import type * as Emit from "../../StreamEmit.js"
810

911
/** @internal */
@@ -44,3 +46,78 @@ export const make = <R, E, A, B>(
4446
}
4547
return Object.assign(emit, ops)
4648
}
49+
50+
/** @internal */
51+
export const makePush = <E, A>(
52+
queue: Queue.Queue<Array<A> | Exit.Exit<void, E>>,
53+
scheduler: Scheduler.Scheduler
54+
): Emit.EmitOpsPush<E, A> => {
55+
let finished = false
56+
let buffer: Array<A> = []
57+
let running = false
58+
function array(items: ReadonlyArray<A>) {
59+
if (finished) return false
60+
if (items.length <= 50_000) {
61+
buffer.push.apply(buffer, items as Array<A>)
62+
} else {
63+
for (let i = 0; i < items.length; i++) {
64+
buffer.push(items[0])
65+
}
66+
}
67+
if (!running) {
68+
running = true
69+
scheduler.scheduleTask(flush, 0)
70+
}
71+
return true
72+
}
73+
function flush() {
74+
running = false
75+
if (buffer.length > 0) {
76+
queue.unsafeOffer(buffer)
77+
buffer = []
78+
}
79+
}
80+
function done(exit: Exit.Exit<A, E>) {
81+
if (finished) return
82+
finished = true
83+
if (exit._tag === "Success") {
84+
buffer.push(exit.value)
85+
}
86+
flush()
87+
queue.unsafeOffer(exit._tag === "Success" ? Exit.void : exit)
88+
}
89+
return {
90+
single(value: A) {
91+
if (finished) return false
92+
buffer.push(value)
93+
if (!running) {
94+
running = true
95+
scheduler.scheduleTask(flush, 0)
96+
}
97+
return true
98+
},
99+
array,
100+
chunk(chunk) {
101+
return array(Chunk.toReadonlyArray(chunk))
102+
},
103+
done,
104+
end() {
105+
if (finished) return
106+
finished = true
107+
flush()
108+
queue.unsafeOffer(Exit.void)
109+
},
110+
halt(cause: Cause.Cause<E>) {
111+
return done(Exit.failCause(cause))
112+
},
113+
fail(error: E) {
114+
return done(Exit.fail(error))
115+
},
116+
die<Err>(defect: Err): void {
117+
return done(Exit.die(defect))
118+
},
119+
dieMessage(message: string): void {
120+
return done(Exit.die(new Error(message)))
121+
}
122+
}
123+
}

packages/effect/test/Stream/async.test.ts

+56
Original file line numberDiff line numberDiff line change
@@ -406,4 +406,60 @@ describe("Stream", () => {
406406
yield* $(Fiber.interrupt(fiber), Effect.exit)
407407
assert.isFalse(result)
408408
}))
409+
410+
it.effect("asyncPush", () =>
411+
Effect.gen(function*() {
412+
const array = [1, 2, 3, 4, 5]
413+
const latch = yield* Deferred.make<void>()
414+
const fiber = yield* Stream.asyncPush<number>((emit) => {
415+
array.forEach((n) => {
416+
emit.single(n)
417+
})
418+
return pipe(
419+
Deferred.succeed(latch, void 0),
420+
Effect.asVoid
421+
)
422+
}).pipe(
423+
Stream.take(array.length),
424+
Stream.run(Sink.collectAll()),
425+
Effect.fork
426+
)
427+
yield* Deferred.await(latch)
428+
const result = yield* Fiber.join(fiber)
429+
assert.deepStrictEqual(Array.from(result), array)
430+
}))
431+
432+
it.effect("asyncPush - signals the end of the stream", () =>
433+
Effect.gen(function*() {
434+
const result = yield* Stream.asyncPush<number>((emit) => {
435+
emit.end()
436+
return Effect.void
437+
}).pipe(Stream.runCollect)
438+
assert.isTrue(Chunk.isEmpty(result))
439+
}))
440+
441+
it.effect("asyncPush - handles errors", () =>
442+
Effect.gen(function*() {
443+
const error = new Cause.RuntimeException("boom")
444+
const result = yield* Stream.asyncPush<number, Cause.RuntimeException>((emit) => {
445+
emit.fail(error)
446+
return Effect.void
447+
}).pipe(
448+
Stream.runCollect,
449+
Effect.exit
450+
)
451+
assert.deepStrictEqual(result, Exit.fail(error))
452+
}))
453+
454+
it.effect("asyncPush - handles defects", () =>
455+
Effect.gen(function*() {
456+
const error = new Cause.RuntimeException("boom")
457+
const result = yield* Stream.asyncPush<number, Cause.RuntimeException>(() => {
458+
throw error
459+
}).pipe(
460+
Stream.runCollect,
461+
Effect.exit
462+
)
463+
assert.deepStrictEqual(result, Exit.die(error))
464+
}))
409465
})

0 commit comments

Comments
 (0)