diff --git a/docs/api/streams.md b/docs/api/streams.md index cf404ed6914631..816b94dc8d7d00 100644 --- a/docs/api/streams.md +++ b/docs/api/streams.md @@ -56,6 +56,45 @@ const stream = new ReadableStream({ When using a direct `ReadableStream`, all chunk queueing is handled by the destination. The consumer of the stream receives exactly what is passed to `controller.write()`, without any encoding or modification. +## Async generator streams + +Bun also supports async generator functions as a source for `Response` and `Request`. This is an easy way to create a `ReadableStream` that fetches data from an asynchronous source. + +```ts +const response = new Response(async function* () { + yield "hello"; + yield "world"; +}()); + +await response.text(); // "helloworld" +``` + +You can also use `[Symbol.asyncIterator]` directly. + +```ts +const response = new Response({ + [Symbol.asyncIterator]: async function* () { + yield "hello"; + yield "world"; + }, +}); + +await response.text(); // "helloworld" +``` + +If you need more granular control over the stream, `yield` will return the direct ReadableStream controller. + +```ts +const response = new Response({ + [Symbol.asyncIterator]: async function* () { + const controller = yield "hello"; + await controller.end(); + }, +}); + +await response.text(); // "hello" +``` + ## `Bun.ArrayBufferSink` The `Bun.ArrayBufferSink` class is a fast incremental writer for constructing an `ArrayBuffer` of unknown size. diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp index a58b0194e9ff24..7fa7546ae187a3 100644 --- a/src/bun.js/bindings/ZigGlobalObject.cpp +++ b/src/bun.js/bindings/ZigGlobalObject.cpp @@ -1,10 +1,10 @@ + #include "root.h" #include "ZigGlobalObject.h" #include #include "helpers.h" #include "BunClientData.h" - -#include "JavaScriptCore/AggregateError.h" +#include "JavaScriptCore/JSObjectInlines.h" #include "JavaScriptCore/InternalFieldTuple.h" #include "JavaScriptCore/BytecodeIndex.h" #include "JavaScriptCore/CallFrameInlines.h" @@ -24,7 +24,6 @@ #include "JavaScriptCore/IteratorOperations.h" #include "JavaScriptCore/JSArray.h" #include "JavaScriptCore/JSGlobalProxyInlines.h" - #include "JavaScriptCore/JSCallbackConstructor.h" #include "JavaScriptCore/JSCallbackObject.h" #include "JavaScriptCore/JSCast.h" @@ -2311,19 +2310,69 @@ extern "C" bool ReadableStream__isLocked(JSC__JSValue possibleReadableStream, Zi return stream != nullptr && ReadableStream::isLocked(globalObject, stream); } -extern "C" int32_t ReadableStreamTag__tagged(Zig::GlobalObject* globalObject, JSC__JSValue possibleReadableStream, JSValue* ptr); -extern "C" int32_t ReadableStreamTag__tagged(Zig::GlobalObject* globalObject, JSC__JSValue possibleReadableStream, JSValue* ptr) +extern "C" int32_t ReadableStreamTag__tagged(Zig::GlobalObject* globalObject, JSC__JSValue* possibleReadableStream, JSValue* ptr) { ASSERT(globalObject); - JSC::JSObject* object = JSValue::decode(possibleReadableStream).getObject(); - if (!object || !object->inherits()) { + JSC::JSObject* object = JSValue::decode(*possibleReadableStream).getObject(); + if (!object) { *ptr = JSC::JSValue(); return -1; } - auto* readableStream = jsCast(object); auto& vm = globalObject->vm(); - auto& builtinNames = WebCore::clientData(vm)->builtinNames(); + const auto& builtinNames = WebCore::builtinNames(vm); + + if (!object->inherits()) { + auto throwScope = DECLARE_THROW_SCOPE(vm); + JSValue target = object; + JSValue fn = JSValue(); + auto* function = jsDynamicCast(object); + if (function && function->jsExecutable() && function->jsExecutable()->isAsyncGenerator()) { + fn = object; + target = jsUndefined(); + } else if (auto iterable = object->getIfPropertyExists(globalObject, vm.propertyNames->asyncIteratorSymbol)) { + if (iterable.isCallable()) { + fn = iterable; + } + } + + if (UNLIKELY(throwScope.exception())) { + *ptr = JSC::JSValue(); + return -1; + } + + if (fn.isEmpty()) { + *ptr = JSC::JSValue(); + return -1; + } + + auto* createIterator = globalObject->builtinInternalFunctions().readableStreamInternals().m_readableStreamFromAsyncIteratorFunction.get(); + + JSC::MarkedArgumentBuffer arguments; + arguments.append(target); + arguments.append(fn); + + JSC::JSValue result = profiledCall(globalObject, JSC::ProfilingReason::API, createIterator, JSC::getCallData(createIterator), JSC::jsUndefined(), arguments); + + if (UNLIKELY(throwScope.exception())) { + return -1; + } + + if (!result.isObject()) { + *ptr = JSC::JSValue(); + return -1; + } + + object = result.getObject(); + + ASSERT(object->inherits()); + *possibleReadableStream = JSValue::encode(object); + *ptr = JSValue(); + ensureStillAliveHere(object); + return 0; + } + + auto* readableStream = jsCast(object); int32_t num = 0; if (JSValue numberValue = readableStream->getDirect(vm, builtinNames.bunNativeTypePrivateName())) { diff --git a/src/bun.js/bindings/bindings.cpp b/src/bun.js/bindings/bindings.cpp index 32db72842aa7c7..da91128e756ce3 100644 --- a/src/bun.js/bindings/bindings.cpp +++ b/src/bun.js/bindings/bindings.cpp @@ -4716,6 +4716,7 @@ enum class BuiltinNamesMap : uint8_t { toString, redirect, inspectCustom, + asyncIterator, }; static JSC::Identifier builtinNameMap(JSC::JSGlobalObject* globalObject, unsigned char name) @@ -4753,6 +4754,9 @@ static JSC::Identifier builtinNameMap(JSC::JSGlobalObject* globalObject, unsigne case BuiltinNamesMap::inspectCustom: { return Identifier::fromUid(vm.symbolRegistry().symbolForKey("nodejs.util.inspect.custom"_s)); } + case BuiltinNamesMap::asyncIterator: { + return vm.propertyNames->asyncIteratorSymbol; + } } } @@ -5410,22 +5414,22 @@ extern "C" bool JSGlobalObject__hasException(JSC::JSGlobalObject* globalObject) return DECLARE_CATCH_SCOPE(globalObject->vm()).exception() != 0; } -CPP_DECL bool JSC__GetterSetter__isGetterNull(JSC__GetterSetter *gettersetter) +CPP_DECL bool JSC__GetterSetter__isGetterNull(JSC__GetterSetter* gettersetter) { return gettersetter->isGetterNull(); } -CPP_DECL bool JSC__GetterSetter__isSetterNull(JSC__GetterSetter *gettersetter) +CPP_DECL bool JSC__GetterSetter__isSetterNull(JSC__GetterSetter* gettersetter) { return gettersetter->isSetterNull(); } -CPP_DECL bool JSC__CustomGetterSetter__isGetterNull(JSC__CustomGetterSetter *gettersetter) +CPP_DECL bool JSC__CustomGetterSetter__isGetterNull(JSC__CustomGetterSetter* gettersetter) { return gettersetter->getter() == nullptr; } -CPP_DECL bool JSC__CustomGetterSetter__isSetterNull(JSC__CustomGetterSetter *gettersetter) +CPP_DECL bool JSC__CustomGetterSetter__isSetterNull(JSC__CustomGetterSetter* gettersetter) { return gettersetter->setter() == nullptr; } diff --git a/src/bun.js/bindings/bindings.zig b/src/bun.js/bindings/bindings.zig index 2ac7f25f6024a4..43039ddff657de 100644 --- a/src/bun.js/bindings/bindings.zig +++ b/src/bun.js/bindings/bindings.zig @@ -4541,6 +4541,7 @@ pub const JSValue = enum(JSValueReprInt) { toString, redirect, inspectCustom, + asyncIterator, }; // intended to be more lightweight than ZigString diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index e7fa8b4d2e60e2..1ed29b13825ed7 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -241,7 +241,7 @@ pub const ReadableStream = struct { Bytes: *ByteStream, }; - extern fn ReadableStreamTag__tagged(globalObject: *JSGlobalObject, possibleReadableStream: JSValue, ptr: *JSValue) Tag; + extern fn ReadableStreamTag__tagged(globalObject: *JSGlobalObject, possibleReadableStream: *JSValue, ptr: *JSValue) Tag; extern fn ReadableStream__isDisturbed(possibleReadableStream: JSValue, globalObject: *JSGlobalObject) bool; extern fn ReadableStream__isLocked(possibleReadableStream: JSValue, globalObject: *JSGlobalObject) bool; extern fn ReadableStream__empty(*JSGlobalObject) JSC.JSValue; @@ -269,41 +269,42 @@ pub const ReadableStream = struct { pub fn fromJS(value: JSValue, globalThis: *JSGlobalObject) ?ReadableStream { JSC.markBinding(@src()); var ptr = JSValue.zero; - return switch (ReadableStreamTag__tagged(globalThis, value, &ptr)) { + var out = value; + return switch (ReadableStreamTag__tagged(globalThis, &out, &ptr)) { .JavaScript => ReadableStream{ - .value = value, + .value = out, .ptr = .{ .JavaScript = {}, }, }, .Blob => ReadableStream{ - .value = value, + .value = out, .ptr = .{ .Blob = ptr.asPtr(ByteBlobLoader), }, }, .File => ReadableStream{ - .value = value, + .value = out, .ptr = .{ .File = ptr.asPtr(FileReader), }, }, .Bytes => ReadableStream{ - .value = value, + .value = out, .ptr = .{ .Bytes = ptr.asPtr(ByteStream), }, }, // .HTTPRequest => ReadableStream{ - // .value = value, + // .value = out, // .ptr = .{ // .HTTPRequest = ptr.asPtr(HTTPRequest), // }, // }, // .HTTPSRequest => ReadableStream{ - // .value = value, + // .value = out, // .ptr = .{ // .HTTPSRequest = ptr.asPtr(HTTPSRequest), // }, diff --git a/src/js/builtins/ReadableStream.ts b/src/js/builtins/ReadableStream.ts index c1d43a43059cba..079fc129f2ac37 100644 --- a/src/js/builtins/ReadableStream.ts +++ b/src/js/builtins/ReadableStream.ts @@ -140,7 +140,7 @@ export function readableStreamToArrayBuffer(stream: ReadableStream) var result = Bun.readableStreamToArray(stream); if ($isPromise(result)) { - // `result` is an InternalPromise, which doesn't have a `.$then` method + // `result` is an InternalPromise, which doesn't have a `.then` method // but `.then` isn't user-overridable, so we can use it safely. return result.then(Bun.concatArrayBuffers); } @@ -160,12 +160,12 @@ export function readableStreamToFormData( $linkTimeConstant; export function readableStreamToJSON(stream: ReadableStream): unknown { - return Bun.readableStreamToText(stream).$then(globalThis.JSON.parse); + return Promise.resolve(Bun.readableStreamToText(stream)).then(globalThis.JSON.parse); } $linkTimeConstant; export function readableStreamToBlob(stream: ReadableStream): Promise { - return Promise.resolve(Bun.readableStreamToArray(stream)).$then(array => new Blob(array)); + return Promise.resolve(Bun.readableStreamToArray(stream)).then(array => new Blob(array)); } $linkTimeConstant; diff --git a/src/js/builtins/ReadableStreamInternals.ts b/src/js/builtins/ReadableStreamInternals.ts index ca050ef7dff53e..f3559892f44bf0 100644 --- a/src/js/builtins/ReadableStreamInternals.ts +++ b/src/js/builtins/ReadableStreamInternals.ts @@ -613,7 +613,6 @@ export function isReadableStreamDefaultController(controller) { export function readDirectStream(stream, sink, underlyingSource) { $putByIdDirectPrivate(stream, "underlyingSource", undefined); $putByIdDirectPrivate(stream, "start", undefined); - function close(stream, reason) { if (reason && underlyingSource?.cancel) { try { @@ -647,21 +646,24 @@ export function readDirectStream(stream, sink, underlyingSource) { $throwTypeError("pull is not a function"); return; } - $putByIdDirectPrivate(stream, "readableStreamController", sink); const highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark"); - sink.start({ highWaterMark: !highWaterMark || highWaterMark < 64 ? 64 : highWaterMark, }); $startDirectStream.$call(sink, stream, underlyingSource.pull, close, stream.$asyncContext); + $putByIdDirectPrivate(stream, "reader", {}); var maybePromise = underlyingSource.pull(sink); sink = undefined; if (maybePromise && $isPromise(maybePromise)) { - return maybePromise.$then(() => {}); + if (maybePromise.$then) { + return maybePromise.$then(() => {}); + } + + return maybePromise.then(() => {}); } } @@ -1245,7 +1247,6 @@ export function readableStreamError(stream, error) { $assert($getByIdDirectPrivate(stream, "state") === $streamReadable); $putByIdDirectPrivate(stream, "state", $streamErrored); $putByIdDirectPrivate(stream, "storedError", error); - const reader = $getByIdDirectPrivate(stream, "reader"); if (!reader) return; @@ -1515,6 +1516,88 @@ export function readableStreamDefaultControllerCanCloseOrEnqueue(controller) { return $getByIdDirectPrivate(controlledReadableStream, "state") === $streamReadable; } +export function readableStreamFromAsyncIterator(target, fn) { + var cancelled = false, + iter: AsyncIterator; + return new ReadableStream({ + type: "direct", + + cancel(reason) { + $debug("readableStreamFromAsyncIterator.cancel", reason); + cancelled = true; + + if (iter) { + iter.throw?.((reason ||= new DOMException("ReadableStream has been cancelled", "AbortError"))); + } + }, + + async pull(controller) { + // we deliberately want to throw on error + iter = fn.$call(target, controller); + fn = target = undefined; + + if (!$isAsyncGenerator(iter) && typeof iter.next !== "function") { + iter = undefined; + throw new TypeError("Expected an async generator"); + } + + var closingError, value, done, immediateTask; + + try { + while (!cancelled && !done) { + const promise = iter.next(controller); + if (cancelled) { + return; + } + + if ( + $isPromise(promise) && + ($getPromiseInternalField(promise, $promiseFieldFlags) & $promiseStateMask) === $promiseStateFulfilled + ) { + clearImmediate(immediateTask); + ({ value, done } = $getPromiseInternalField(promise, $promiseFieldReactionsOrResult)); + $assert(!$isPromise(value), "Expected a value, not a promise"); + } else { + immediateTask = setImmediate(() => immediateTask && controller?.flush?.(true)); + ({ value, done } = await promise); + + if (cancelled) { + return; + } + } + + if (!$isUndefinedOrNull(value)) { + controller.write(value); + } + } + } catch (e) { + closingError = e; + } finally { + clearImmediate(immediateTask); + immediateTask = undefined; + + // Stream was closed before we tried writing to it. + if (closingError?.code === "ERR_INVALID_THIS") { + await iter.return?.(); + return; + } + + if (closingError) { + try { + await iter.throw?.(closingError); + } finally { + throw closingError; + } + } else { + await controller.end(); + await iter.return?.(); + } + iter = undefined; + } + }, + }); +} + export function lazyLoadStream(stream, autoAllocateChunkSize) { $debug("lazyLoadStream", stream, autoAllocateChunkSize); var nativeType = $getByIdDirectPrivate(stream, "bunNativeType"); @@ -1764,25 +1847,32 @@ export function readableStreamToArrayBufferDirect(stream, underlyingSource) { var didError = false; try { - const firstPull = pull(controller); - if (firstPull && $isObject(firstPull) && $isPromise(firstPull)) { - return (async function (controller, promise, pull) { - while (!ended) { - await pull(controller); - } - return await promise; - })(controller, promise, pull); - } - - return capability.promise; + var firstPull = pull(controller); } catch (e) { didError = true; $readableStreamError(stream, e); return Promise.$reject(e); } finally { - if (!didError && stream) $readableStreamClose(stream); - controller = close = sink = pull = stream = undefined; + if (!$isPromise(firstPull)) { + if (!didError && stream) $readableStreamClose(stream); + controller = close = sink = pull = stream = undefined; + return capability.promise; + } } + + $assert($isPromise(firstPull)); + return firstPull.then( + () => { + if (!didError && stream) $readableStreamClose(stream); + controller = close = sink = pull = stream = undefined; + return capability.promise; + }, + e => { + didError = true; + if ($getByIdDirectPrivate(stream, "state") === $streamReadable) $readableStreamError(stream, e); + return Promise.$reject(e); + }, + ); } export async function readableStreamToTextDirect(stream, underlyingSource) { diff --git a/test/js/bun/http/async-iterator-stream.test.ts b/test/js/bun/http/async-iterator-stream.test.ts new file mode 100644 index 00000000000000..0befeefda77acb --- /dev/null +++ b/test/js/bun/http/async-iterator-stream.test.ts @@ -0,0 +1,227 @@ +import { describe, expect, test } from "bun:test"; +import { bunExe, bunEnv } from "harness"; +import path from "path"; + +describe("Streaming body via", () => { + test("async generator function", async () => { + const server = Bun.serve({ + port: 0, + + async fetch(req) { + return new Response(async function* yo() { + yield "Hello, "; + await Bun.sleep(30); + yield Buffer.from("world!"); + return "!"; + }); + }, + }); + + const res = await fetch(`${server.url}/`); + const chunks = []; + for await (const chunk of res.body) { + chunks.push(chunk); + } + + expect(Buffer.concat(chunks).toString()).toBe("Hello, world!!"); + expect(chunks).toHaveLength(2); + server.stop(true); + }); + + test("async generator function throws an error but continues to send the headers", async () => { + const server = Bun.serve({ + port: 0, + + async fetch(req) { + return new Response( + async function* () { + throw new Error("Oops"); + }, + { + headers: { + "X-Hey": "123", + }, + }, + ); + }, + }); + + const res = await fetch(server.url); + expect(res.headers.get("X-Hey")).toBe("123"); + server.stop(true); + }); + + test("async generator aborted doesn't crash", async () => { + var aborter = new AbortController(); + const server = Bun.serve({ + port: 0, + + async fetch(req) { + return new Response( + async function* yo() { + queueMicrotask(() => aborter.abort()); + yield "123"; + await Bun.sleep(0); + }, + { + headers: { + "X-Hey": "123", + }, + }, + ); + }, + }); + try { + const res = await fetch(`${server.url}/`, { signal: aborter.signal }); + } catch (e) { + expect(e).toBeInstanceOf(DOMException); + expect(e.name).toBe("AbortError"); + } finally { + server.stop(true); + } + }); + + test("[Symbol.asyncIterator]", async () => { + const server = Bun.serve({ + port: 0, + + async fetch(req) { + return new Response({ + async *[Symbol.asyncIterator]() { + var controller = yield "my string goes here\n"; + var controller2 = yield Buffer.from("my buffer goes here\n"); + await Bun.sleep(30); + yield Buffer.from("end!\n"); + if (controller !== controller2 || typeof controller.sinkId !== "number") { + throw new Error("Controller mismatch"); + } + return "!"; + }, + }); + }, + }); + + const res = await fetch(`${server.url}/`); + const chunks = []; + for await (const chunk of res.body) { + chunks.push(chunk); + } + + expect(Buffer.concat(chunks).toString()).toBe("my string goes here\nmy buffer goes here\nend!\n!"); + expect(chunks).toHaveLength(2); + server.stop(true); + }); + + test("[Symbol.asyncIterator] with a custom iterator", async () => { + const server = Bun.serve({ + port: 0, + + async fetch(req) { + var hasRun = false; + return new Response({ + [Symbol.asyncIterator]() { + return { + async next() { + await Bun.sleep(30); + + if (hasRun) { + return { value: Buffer.from("world!"), done: true }; + } + + hasRun = true; + return { value: "Hello, ", done: false }; + }, + }; + }, + }); + }, + }); + + const res = await fetch(server.url); + const chunks = []; + for await (const chunk of res.body) { + chunks.push(chunk); + } + + expect(Buffer.concat(chunks).toString()).toBe("Hello, world!"); + // TODO: + // expect(chunks).toHaveLength(2); + server.stop(true); + }); + + test("yield", async () => { + const response = new Response({ + [Symbol.asyncIterator]: async function* () { + const controller = yield "hello"; + await controller.end(); + }, + }); + + expect(await response.text()).toBe("hello"); + }); + + const callbacks = [ + { + fn: async function* () { + yield '"Hello, '; + yield Buffer.from('world! #1"'); + return; + }, + expected: '"Hello, world! #1"', + }, + { + fn: async function* () { + yield '"Hello, '; + await Bun.sleep(30); + yield Buffer.from('world! #2"'); + return; + }, + expected: '"Hello, world! #2"', + }, + { + fn: async function* () { + yield '"Hello, '; + await 42; + yield Buffer.from('world! #3"'); + return; + }, + expected: '"Hello, world! #3"', + }, + { + fn: async function* () { + yield '"Hello, '; + await 42; + return Buffer.from('world! #4"'); + }, + expected: '"Hello, world! #4"', + }, + ]; + + for (let { fn, expected } of callbacks) { + describe(expected, () => { + for (let bodyInit of [fn, { [Symbol.asyncIterator]: fn }] as const) { + for (let [label, constructFn] of [ + ["Response", () => new Response(bodyInit)], + ["Request", () => new Request({ "url": "https://example.com", body: bodyInit })], + ]) { + for (let method of ["arrayBuffer", "text"]) { + test(`${label}(${method})`, async () => { + const result = await constructFn()[method](); + expect(Buffer.from(result)).toEqual(Buffer.from(expected)); + }); + } + + test(`${label}(json)`, async () => { + const result = await constructFn().json(); + expect(result).toEqual(JSON.parse(expected)); + }); + + test(`${label}(blob)`, async () => { + const result = await constructFn().blob(); + expect(await result.arrayBuffer()).toEqual(await new Blob([expected]).arrayBuffer()); + }); + } + } + }); + } +});