Skip to content

Commit

Permalink
feat: Support async generator functions in Response and Request f…
Browse files Browse the repository at this point in the history
…or bodies (#8941)
  • Loading branch information
Jarred-Sumner authored Feb 17, 2024
1 parent e2c92c6 commit abf1239
Show file tree
Hide file tree
Showing 8 changed files with 453 additions and 42 deletions.
39 changes: 39 additions & 0 deletions docs/api/streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,45 @@ const stream = new ReadableStream({

When using a direct `ReadableStream`, all chunk queueing is handled by the destination. The consumer of the stream receives exactly what is passed to `controller.write()`, without any encoding or modification.

## Async generator streams

Bun also supports async generator functions as a source for `Response` and `Request`. This is an easy way to create a `ReadableStream` that fetches data from an asynchronous source.

```ts
const response = new Response(async function* () {
yield "hello";
yield "world";
}());

await response.text(); // "helloworld"
```

You can also use `[Symbol.asyncIterator]` directly.

```ts
const response = new Response({
[Symbol.asyncIterator]: async function* () {
yield "hello";
yield "world";
},
});

await response.text(); // "helloworld"
```

If you need more granular control over the stream, `yield` will return the direct ReadableStream controller.

```ts
const response = new Response({
[Symbol.asyncIterator]: async function* () {
const controller = yield "hello";
await controller.end();
},
});

await response.text(); // "hello"
```

## `Bun.ArrayBufferSink`

The `Bun.ArrayBufferSink` class is a fast incremental writer for constructing an `ArrayBuffer` of unknown size.
Expand Down
67 changes: 58 additions & 9 deletions src/bun.js/bindings/ZigGlobalObject.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@

#include "root.h"
#include "ZigGlobalObject.h"
#include <JavaScriptCore/GlobalObjectMethodTable.h>
#include "helpers.h"
#include "BunClientData.h"

#include "JavaScriptCore/AggregateError.h"
#include "JavaScriptCore/JSObjectInlines.h"
#include "JavaScriptCore/InternalFieldTuple.h"
#include "JavaScriptCore/BytecodeIndex.h"
#include "JavaScriptCore/CallFrameInlines.h"
Expand All @@ -24,7 +24,6 @@
#include "JavaScriptCore/IteratorOperations.h"
#include "JavaScriptCore/JSArray.h"
#include "JavaScriptCore/JSGlobalProxyInlines.h"

#include "JavaScriptCore/JSCallbackConstructor.h"
#include "JavaScriptCore/JSCallbackObject.h"
#include "JavaScriptCore/JSCast.h"
Expand Down Expand Up @@ -2311,19 +2310,69 @@ extern "C" bool ReadableStream__isLocked(JSC__JSValue possibleReadableStream, Zi
return stream != nullptr && ReadableStream::isLocked(globalObject, stream);
}

extern "C" int32_t ReadableStreamTag__tagged(Zig::GlobalObject* globalObject, JSC__JSValue possibleReadableStream, JSValue* ptr);
extern "C" int32_t ReadableStreamTag__tagged(Zig::GlobalObject* globalObject, JSC__JSValue possibleReadableStream, JSValue* ptr)
extern "C" int32_t ReadableStreamTag__tagged(Zig::GlobalObject* globalObject, JSC__JSValue* possibleReadableStream, JSValue* ptr)
{
ASSERT(globalObject);
JSC::JSObject* object = JSValue::decode(possibleReadableStream).getObject();
if (!object || !object->inherits<JSReadableStream>()) {
JSC::JSObject* object = JSValue::decode(*possibleReadableStream).getObject();
if (!object) {
*ptr = JSC::JSValue();
return -1;
}

auto* readableStream = jsCast<JSReadableStream*>(object);
auto& vm = globalObject->vm();
auto& builtinNames = WebCore::clientData(vm)->builtinNames();
const auto& builtinNames = WebCore::builtinNames(vm);

if (!object->inherits<JSReadableStream>()) {
auto throwScope = DECLARE_THROW_SCOPE(vm);
JSValue target = object;
JSValue fn = JSValue();
auto* function = jsDynamicCast<JSC::JSFunction*>(object);
if (function && function->jsExecutable() && function->jsExecutable()->isAsyncGenerator()) {
fn = object;
target = jsUndefined();
} else if (auto iterable = object->getIfPropertyExists(globalObject, vm.propertyNames->asyncIteratorSymbol)) {
if (iterable.isCallable()) {
fn = iterable;
}
}

if (UNLIKELY(throwScope.exception())) {
*ptr = JSC::JSValue();
return -1;
}

if (fn.isEmpty()) {
*ptr = JSC::JSValue();
return -1;
}

auto* createIterator = globalObject->builtinInternalFunctions().readableStreamInternals().m_readableStreamFromAsyncIteratorFunction.get();

JSC::MarkedArgumentBuffer arguments;
arguments.append(target);
arguments.append(fn);

JSC::JSValue result = profiledCall(globalObject, JSC::ProfilingReason::API, createIterator, JSC::getCallData(createIterator), JSC::jsUndefined(), arguments);

if (UNLIKELY(throwScope.exception())) {
return -1;
}

if (!result.isObject()) {
*ptr = JSC::JSValue();
return -1;
}

object = result.getObject();

ASSERT(object->inherits<JSReadableStream>());
*possibleReadableStream = JSValue::encode(object);
*ptr = JSValue();
ensureStillAliveHere(object);
return 0;
}

auto* readableStream = jsCast<JSReadableStream*>(object);

int32_t num = 0;
if (JSValue numberValue = readableStream->getDirect(vm, builtinNames.bunNativeTypePrivateName())) {
Expand Down
12 changes: 8 additions & 4 deletions src/bun.js/bindings/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4716,6 +4716,7 @@ enum class BuiltinNamesMap : uint8_t {
toString,
redirect,
inspectCustom,
asyncIterator,
};

static JSC::Identifier builtinNameMap(JSC::JSGlobalObject* globalObject, unsigned char name)
Expand Down Expand Up @@ -4753,6 +4754,9 @@ static JSC::Identifier builtinNameMap(JSC::JSGlobalObject* globalObject, unsigne
case BuiltinNamesMap::inspectCustom: {
return Identifier::fromUid(vm.symbolRegistry().symbolForKey("nodejs.util.inspect.custom"_s));
}
case BuiltinNamesMap::asyncIterator: {
return vm.propertyNames->asyncIteratorSymbol;
}
}
}

Expand Down Expand Up @@ -5410,22 +5414,22 @@ extern "C" bool JSGlobalObject__hasException(JSC::JSGlobalObject* globalObject)
return DECLARE_CATCH_SCOPE(globalObject->vm()).exception() != 0;
}

CPP_DECL bool JSC__GetterSetter__isGetterNull(JSC__GetterSetter *gettersetter)
CPP_DECL bool JSC__GetterSetter__isGetterNull(JSC__GetterSetter* gettersetter)
{
return gettersetter->isGetterNull();
}

CPP_DECL bool JSC__GetterSetter__isSetterNull(JSC__GetterSetter *gettersetter)
CPP_DECL bool JSC__GetterSetter__isSetterNull(JSC__GetterSetter* gettersetter)
{
return gettersetter->isSetterNull();
}

CPP_DECL bool JSC__CustomGetterSetter__isGetterNull(JSC__CustomGetterSetter *gettersetter)
CPP_DECL bool JSC__CustomGetterSetter__isGetterNull(JSC__CustomGetterSetter* gettersetter)
{
return gettersetter->getter() == nullptr;
}

CPP_DECL bool JSC__CustomGetterSetter__isSetterNull(JSC__CustomGetterSetter *gettersetter)
CPP_DECL bool JSC__CustomGetterSetter__isSetterNull(JSC__CustomGetterSetter* gettersetter)
{
return gettersetter->setter() == nullptr;
}
1 change: 1 addition & 0 deletions src/bun.js/bindings/bindings.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4541,6 +4541,7 @@ pub const JSValue = enum(JSValueReprInt) {
toString,
redirect,
inspectCustom,
asyncIterator,
};

// intended to be more lightweight than ZigString
Expand Down
17 changes: 9 additions & 8 deletions src/bun.js/webcore/streams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ pub const ReadableStream = struct {
Bytes: *ByteStream,
};

extern fn ReadableStreamTag__tagged(globalObject: *JSGlobalObject, possibleReadableStream: JSValue, ptr: *JSValue) Tag;
extern fn ReadableStreamTag__tagged(globalObject: *JSGlobalObject, possibleReadableStream: *JSValue, ptr: *JSValue) Tag;
extern fn ReadableStream__isDisturbed(possibleReadableStream: JSValue, globalObject: *JSGlobalObject) bool;
extern fn ReadableStream__isLocked(possibleReadableStream: JSValue, globalObject: *JSGlobalObject) bool;
extern fn ReadableStream__empty(*JSGlobalObject) JSC.JSValue;
Expand Down Expand Up @@ -269,41 +269,42 @@ pub const ReadableStream = struct {
pub fn fromJS(value: JSValue, globalThis: *JSGlobalObject) ?ReadableStream {
JSC.markBinding(@src());
var ptr = JSValue.zero;
return switch (ReadableStreamTag__tagged(globalThis, value, &ptr)) {
var out = value;
return switch (ReadableStreamTag__tagged(globalThis, &out, &ptr)) {
.JavaScript => ReadableStream{
.value = value,
.value = out,
.ptr = .{
.JavaScript = {},
},
},
.Blob => ReadableStream{
.value = value,
.value = out,
.ptr = .{
.Blob = ptr.asPtr(ByteBlobLoader),
},
},
.File => ReadableStream{
.value = value,
.value = out,
.ptr = .{
.File = ptr.asPtr(FileReader),
},
},

.Bytes => ReadableStream{
.value = value,
.value = out,
.ptr = .{
.Bytes = ptr.asPtr(ByteStream),
},
},

// .HTTPRequest => ReadableStream{
// .value = value,
// .value = out,
// .ptr = .{
// .HTTPRequest = ptr.asPtr(HTTPRequest),
// },
// },
// .HTTPSRequest => ReadableStream{
// .value = value,
// .value = out,
// .ptr = .{
// .HTTPSRequest = ptr.asPtr(HTTPSRequest),
// },
Expand Down
6 changes: 3 additions & 3 deletions src/js/builtins/ReadableStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ export function readableStreamToArrayBuffer(stream: ReadableStream<ArrayBuffer>)

var result = Bun.readableStreamToArray(stream);
if ($isPromise(result)) {
// `result` is an InternalPromise, which doesn't have a `.$then` method
// `result` is an InternalPromise, which doesn't have a `.then` method
// but `.then` isn't user-overridable, so we can use it safely.
return result.then(Bun.concatArrayBuffers);
}
Expand All @@ -160,12 +160,12 @@ export function readableStreamToFormData(

$linkTimeConstant;
export function readableStreamToJSON(stream: ReadableStream): unknown {
return Bun.readableStreamToText(stream).$then(globalThis.JSON.parse);
return Promise.resolve(Bun.readableStreamToText(stream)).then(globalThis.JSON.parse);
}

$linkTimeConstant;
export function readableStreamToBlob(stream: ReadableStream): Promise<Blob> {
return Promise.resolve(Bun.readableStreamToArray(stream)).$then(array => new Blob(array));
return Promise.resolve(Bun.readableStreamToArray(stream)).then(array => new Blob(array));
}

$linkTimeConstant;
Expand Down
Loading

0 comments on commit abf1239

Please # to comment.