diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index e95da2f4af..df41505204 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -1,13 +1,15 @@ import { isPromise } from '../jsutils/isPromise.js'; import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; +import type { GraphQLError } from '../error/GraphQLError.js'; + import type { DeferredFragmentRecord, DeferredGroupedFieldSetRecord, IncrementalDataRecord, IncrementalDataRecordResult, ReconcilableDeferredGroupedFieldSetResult, - StreamItemsRecord, + StreamItemRecord, StreamRecord, SubsequentResultRecord, } from './types.js'; @@ -27,9 +29,9 @@ function isDeferredFragmentNode( } function isStreamNode( - subsequentResultNode: SubsequentResultNode, -): subsequentResultNode is StreamRecord { - return 'path' in subsequentResultNode; + record: SubsequentResultNode | IncrementalDataRecord, +): record is StreamRecord { + return 'streamItemQueue' in record; } type SubsequentResultNode = DeferredFragmentNode | StreamRecord; @@ -67,7 +69,7 @@ export class IncrementalGraph { if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { this._addDeferredGroupedFieldSetRecord(incrementalDataRecord); } else { - this._addStreamItemsRecord(incrementalDataRecord); + this._addStreamRecord(incrementalDataRecord); } } } @@ -95,6 +97,7 @@ export class IncrementalGraph { if (isStreamNode(node)) { this._pending.add(node); newPending.push(node); + this._newIncrementalDataRecords.add(node); } else if (node.deferredGroupedFieldSetRecords.size > 0) { for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) { this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode); @@ -110,12 +113,20 @@ export class IncrementalGraph { this._newPending.clear(); for (const incrementalDataRecord of this._newIncrementalDataRecords) { - const result = incrementalDataRecord.result.value; - if (isPromise(result)) { + if (isStreamNode(incrementalDataRecord)) { // eslint-disable-next-line @typescript-eslint/no-floating-promises - result.then((resolved) => this._enqueue(resolved)); + this._onStreamItems( + incrementalDataRecord, + incrementalDataRecord.streamItemQueue, + ); } else { - this._enqueue(result); + const result = incrementalDataRecord.result.value; + if (isPromise(result)) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + result.then((resolved) => this._enqueue(resolved)); + } else { + this._enqueue(result); + } } } this._newIncrementalDataRecords.clear(); @@ -246,12 +257,8 @@ export class IncrementalGraph { } } - private _addStreamItemsRecord(streamItemsRecord: StreamItemsRecord): void { - const streamRecord = streamItemsRecord.streamRecord; - if (!this._pending.has(streamRecord)) { - this._newPending.add(streamRecord); - } - this._newIncrementalDataRecords.add(streamItemsRecord); + private _addStreamRecord(streamRecord: StreamRecord): void { + this._newPending.add(streamRecord); } private _addDeferredFragmentNode( @@ -283,6 +290,66 @@ export class IncrementalGraph { return deferredFragmentNode; } + private async _onStreamItems( + streamRecord: StreamRecord, + streamItemQueue: Array, + ): Promise { + let items: Array = []; + let errors: Array = []; + let incrementalDataRecords: Array = []; + let streamItemRecord: StreamItemRecord | undefined; + while ((streamItemRecord = streamItemQueue.shift()) !== undefined) { + let result = streamItemRecord.value; + if (isPromise(result)) { + if (items.length > 0) { + this._enqueue({ + streamRecord, + result: + // TODO add additional test case or rework for coverage + errors.length > 0 /* c8 ignore start */ + ? { items, errors } /* c8 ignore stop */ + : { items }, + incrementalDataRecords, + }); + items = []; + errors = []; + incrementalDataRecords = []; + } + // eslint-disable-next-line no-await-in-loop + result = await result; + // wait an additional tick to coalesce resolving additional promises + // within the queue + // eslint-disable-next-line no-await-in-loop + await Promise.resolve(); + } + if (result.item === undefined) { + if (items.length > 0) { + this._enqueue({ + streamRecord, + result: errors.length > 0 ? { items, errors } : { items }, + incrementalDataRecords, + }); + } + this._enqueue( + result.errors === undefined + ? { streamRecord } + : { + streamRecord, + errors: result.errors, + }, + ); + return; + } + items.push(result.item); + if (result.errors !== undefined) { + errors.push(...result.errors); + } + if (result.incrementalDataRecords !== undefined) { + incrementalDataRecords.push(...result.incrementalDataRecords); + } + } + } + private *_yieldCurrentCompletedIncrementalData( first: IncrementalDataRecordResult, ): Generator { diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 461eeb4f93..d3d8749d7e 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -146,10 +146,7 @@ describe('Execute: stream directive', () => { hasNext: true, }, { - incremental: [ - { items: ['banana'], id: '0' }, - { items: ['coconut'], id: '0' }, - ], + incremental: [{ items: ['banana', 'coconut'], id: '0' }], completed: [{ id: '0' }], hasNext: false, }, @@ -169,11 +166,7 @@ describe('Execute: stream directive', () => { hasNext: true, }, { - incremental: [ - { items: ['apple'], id: '0' }, - { items: ['banana'], id: '0' }, - { items: ['coconut'], id: '0' }, - ], + incremental: [{ items: ['apple', 'banana', 'coconut'], id: '0' }], completed: [{ id: '0' }], hasNext: false, }, @@ -220,11 +213,7 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: ['banana'], - id: '0', - }, - { - items: ['coconut'], + items: ['banana', 'coconut'], id: '0', }, ], @@ -284,11 +273,10 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: [['banana', 'banana', 'banana']], - id: '0', - }, - { - items: [['coconut', 'coconut', 'coconut']], + items: [ + ['banana', 'banana', 'banana'], + ['coconut', 'coconut', 'coconut'], + ], id: '0', }, ], @@ -366,15 +354,11 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: [{ name: 'Luke', id: '1' }], - id: '0', - }, - { - items: [{ name: 'Han', id: '2' }], - id: '0', - }, - { - items: [{ name: 'Leia', id: '3' }], + items: [ + { name: 'Luke', id: '1' }, + { name: 'Han', id: '2' }, + { name: 'Leia', id: '3' }, + ], id: '0', }, ], @@ -507,7 +491,7 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: [null], + items: [null, { name: 'Leia', id: '3' }], id: '0', errors: [ { @@ -517,10 +501,6 @@ describe('Execute: stream directive', () => { }, ], }, - { - items: [{ name: 'Leia', id: '3' }], - id: '0', - }, ], completed: [{ id: '0' }], hasNext: false, @@ -557,6 +537,11 @@ describe('Execute: stream directive', () => { items: [{ name: 'Luke', id: '1' }], id: '0', }, + ], + hasNext: true, + }, + { + incremental: [ { items: [{ name: 'Han', id: '2' }], id: '0', @@ -910,7 +895,7 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: [null], + items: [null, { nonNullName: 'Han' }], id: '0', errors: [ { @@ -920,10 +905,6 @@ describe('Execute: stream directive', () => { }, ], }, - { - items: [{ nonNullName: 'Han' }], - id: '0', - }, ], completed: [{ id: '0' }], hasNext: false, @@ -956,7 +937,7 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: [null], + items: [null, { nonNullName: 'Han' }], id: '0', errors: [ { @@ -966,10 +947,6 @@ describe('Execute: stream directive', () => { }, ], }, - { - items: [{ nonNullName: 'Han' }], - id: '0', - }, ], completed: [{ id: '0' }], hasNext: false, @@ -1086,7 +1063,7 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: [null], + items: [null, { nonNullName: 'Han' }], id: '0', errors: [ { @@ -1096,10 +1073,6 @@ describe('Execute: stream directive', () => { }, ], }, - { - items: [{ nonNullName: 'Han' }], - id: '0', - }, ], completed: [{ id: '0' }], hasNext: false, @@ -1400,10 +1373,6 @@ describe('Execute: stream directive', () => { }, { incremental: [ - { - items: [{ name: 'Luke' }], - id: '1', - }, { data: { scalarField: null }, id: '0', @@ -1415,6 +1384,10 @@ describe('Execute: stream directive', () => { }, ], }, + { + items: [{ name: 'Luke' }], + id: '1', + }, ], completed: [{ id: '0' }, { id: '1' }], hasNext: false, @@ -1717,11 +1690,10 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: [{ id: '1', name: 'Luke' }], - id: '0', - }, - { - items: [{ id: '2', name: 'Han' }], + items: [ + { id: '1', name: 'Luke' }, + { id: '2', name: 'Han' }, + ], id: '0', }, ], @@ -1783,11 +1755,7 @@ describe('Execute: stream directive', () => { id: '0', }, { - items: [{ name: 'Luke' }], - id: '1', - }, - { - items: [{ name: 'Han' }], + items: [{ name: 'Luke' }, { name: 'Han' }], id: '1', }, ], @@ -1858,14 +1826,14 @@ describe('Execute: stream directive', () => { value: { pending: [{ id: '2', path: ['friendList', 1], label: 'DeferName' }], incremental: [ - { - items: [{ id: '2' }], - id: '1', - }, { data: { name: 'Luke' }, id: '0', }, + { + items: [{ id: '2' }], + id: '1', + }, ], completed: [{ id: '0' }], hasNext: true, @@ -1959,14 +1927,14 @@ describe('Execute: stream directive', () => { value: { pending: [{ id: '2', path: ['friendList', 1], label: 'DeferName' }], incremental: [ - { - items: [{ id: '2' }], - id: '1', - }, { data: { name: 'Luke' }, id: '0', }, + { + items: [{ id: '2' }], + id: '1', + }, ], completed: [{ id: '0' }], hasNext: true, diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 1c9a9024e2..7b87d25060 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -69,11 +69,10 @@ import type { ExecutionResult, ExperimentalIncrementalExecutionResults, IncrementalDataRecord, - StreamItemsRecord, - StreamItemsResult, + StreamItemRecord, + StreamItemResult, StreamRecord, } from './types.js'; -import { isReconcilableStreamItemsResult } from './types.js'; import { getArgumentValues, getDirectiveValues, @@ -1094,17 +1093,29 @@ async function completeAsyncIteratorValue( // eslint-disable-next-line no-constant-condition while (true) { if (streamUsage && index >= streamUsage.initialCount) { + const streamItemQueue = buildAsyncStreamItemQueue( + index, + path, + asyncIterator, + exeContext, + streamUsage.fieldGroup, + info, + itemType, + ); + const returnFn = asyncIterator.return; let streamRecord: StreamRecord | CancellableStreamRecord; if (returnFn === undefined) { streamRecord = { label: streamUsage.label, path, - } as StreamRecord; + streamItemQueue, + }; } else { streamRecord = { label: streamUsage.label, path, + streamItemQueue, earlyReturn: returnFn.bind(asyncIterator), }; if (exeContext.cancellableStreams === undefined) { @@ -1113,18 +1124,7 @@ async function completeAsyncIteratorValue( exeContext.cancellableStreams.add(streamRecord); } - const firstStreamItems = firstAsyncStreamItems( - streamRecord, - path, - index, - asyncIterator, - exeContext, - streamUsage.fieldGroup, - info, - itemType, - ); - - addIncrementalDataRecords(graphqlWrappedResult, [firstStreamItems]); + addIncrementalDataRecords(graphqlWrappedResult, [streamRecord]); break; } @@ -1267,23 +1267,22 @@ function completeIterableValue( const item = iteration.value; if (streamUsage && index >= streamUsage.initialCount) { - const streamRecord: StreamRecord = { + const syncStreamRecord: StreamRecord = { label: streamUsage.label, path, + streamItemQueue: buildSyncStreamItemQueue( + item, + index, + path, + iterator, + exeContext, + streamUsage.fieldGroup, + info, + itemType, + ), }; - const firstStreamItems = firstSyncStreamItems( - streamRecord, - item, - index, - iterator, - exeContext, - streamUsage.fieldGroup, - info, - itemType, - ); - - addIncrementalDataRecords(graphqlWrappedResult, [firstStreamItems]); + addIncrementalDataRecords(graphqlWrappedResult, [syncStreamRecord]); break; } @@ -2217,26 +2216,22 @@ function getDeferredFragmentRecords( ); } -function firstSyncStreamItems( - streamRecord: StreamRecord, +function buildSyncStreamItemQueue( initialItem: PromiseOrValue, initialIndex: number, + streamPath: Path, iterator: Iterator, exeContext: ExecutionContext, fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemType: GraphQLOutputType, -): StreamItemsRecord { - return { - streamRecord, - result: new BoxedPromiseOrValue( +): Array { + const streamItemQueue: Array = [ + new BoxedPromiseOrValue( Promise.resolve().then(() => { - const path = streamRecord.path; - const initialPath = addPath(path, initialIndex, undefined); - - let result = new BoxedPromiseOrValue( - completeStreamItems( - streamRecord, + const initialPath = addPath(streamPath, initialIndex, undefined); + const firstStreamItem = new BoxedPromiseOrValue( + completeStreamItem( initialPath, initialItem, exeContext, @@ -2246,25 +2241,23 @@ function firstSyncStreamItems( itemType, ), ); - const firstStreamItems = { result }; - let currentStreamItems = firstStreamItems; - let currentIndex = initialIndex; let iteration = iterator.next(); - let erroredSynchronously = false; + let currentIndex = initialIndex + 1; + let currentStreamItem = firstStreamItem; while (!iteration.done) { - const value = result.value; - if (!isPromise(value) && !isReconcilableStreamItemsResult(value)) { - erroredSynchronously = true; + // TODO: add test case for early sync termination + /* c8 ignore next 4 */ + const result = currentStreamItem.value; + if (!isPromise(result) && result.errors !== undefined) { break; } - const item = iteration.value; - currentIndex++; - const currentPath = addPath(path, currentIndex, undefined); - result = new BoxedPromiseOrValue( - completeStreamItems( - streamRecord, - currentPath, - item, + + const itemPath = addPath(streamPath, currentIndex, undefined); + + currentStreamItem = new BoxedPromiseOrValue( + completeStreamItem( + itemPath, + iteration.value, exeContext, { errors: undefined }, fieldGroup, @@ -2272,81 +2265,37 @@ function firstSyncStreamItems( itemType, ), ); - - const nextStreamItems: StreamItemsRecord = { streamRecord, result }; - currentStreamItems.result = new BoxedPromiseOrValue( - prependNextStreamItems( - currentStreamItems.result.value, - nextStreamItems, - ), - ); - currentStreamItems = nextStreamItems; + streamItemQueue.push(currentStreamItem); iteration = iterator.next(); + currentIndex = initialIndex + 1; } - // If a non-reconcilable stream items result was encountered, then the stream terminates in error. - // Otherwise, add a stream terminator. - if (!erroredSynchronously) { - currentStreamItems.result = new BoxedPromiseOrValue( - prependNextStreamItems(currentStreamItems.result.value, { - streamRecord, - result: new BoxedPromiseOrValue({ streamRecord }), - }), - ); - } + streamItemQueue.push(new BoxedPromiseOrValue({})); - return firstStreamItems.result.value; + return firstStreamItem.value; }), ), - }; -} - -function prependNextStreamItems( - result: PromiseOrValue, - nextStreamItems: StreamItemsRecord, -): PromiseOrValue { - if (isPromise(result)) { - return result.then((resolved) => - prependNextResolvedStreamItems(resolved, nextStreamItems), - ); - } - return prependNextResolvedStreamItems(result, nextStreamItems); -} + ]; -function prependNextResolvedStreamItems( - result: StreamItemsResult, - nextStreamItems: StreamItemsRecord, -): StreamItemsResult { - if (!isReconcilableStreamItemsResult(result)) { - return result; - } - const incrementalDataRecords = result.incrementalDataRecords; - return { - ...result, - incrementalDataRecords: - incrementalDataRecords === undefined - ? [nextStreamItems] - : [nextStreamItems, ...incrementalDataRecords], - }; + return streamItemQueue; } -function firstAsyncStreamItems( - streamRecord: StreamRecord, - path: Path, +function buildAsyncStreamItemQueue( initialIndex: number, + streamPath: Path, asyncIterator: AsyncIterator, exeContext: ExecutionContext, fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemType: GraphQLOutputType, -): StreamItemsRecord { - const firstStreamItems: StreamItemsRecord = { - streamRecord, - result: new BoxedPromiseOrValue( - getNextAsyncStreamItemsResult( - streamRecord, - path, +): Array { + const streamItemQueue: Array = []; + streamItemQueue.push( + new BoxedPromiseOrValue( + getNextAsyncStreamItemResult( + streamItemQueue, + streamPath, initialIndex, asyncIterator, exeContext, @@ -2355,38 +2304,38 @@ function firstAsyncStreamItems( itemType, ), ), - }; - return firstStreamItems; + ); + return streamItemQueue; } -async function getNextAsyncStreamItemsResult( - streamRecord: StreamRecord, - path: Path, +async function getNextAsyncStreamItemResult( + streamItemQueue: Array, + streamPath: Path, index: number, asyncIterator: AsyncIterator, exeContext: ExecutionContext, fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemType: GraphQLOutputType, -): Promise { +): Promise { let iteration; try { iteration = await asyncIterator.next(); } catch (error) { return { - streamRecord, - errors: [locatedError(error, toNodes(fieldGroup), pathToArray(path))], + errors: [ + locatedError(error, toNodes(fieldGroup), pathToArray(streamPath)), + ], }; } if (iteration.done) { - return { streamRecord }; + return {}; } - const itemPath = addPath(path, index, undefined); + const itemPath = addPath(streamPath, index, undefined); - const result = completeStreamItems( - streamRecord, + const result = completeStreamItem( itemPath, iteration.value, exeContext, @@ -2396,12 +2345,11 @@ async function getNextAsyncStreamItemsResult( itemType, ); - const nextStreamItems: StreamItemsRecord = { - streamRecord, - result: new BoxedPromiseOrValue( - getNextAsyncStreamItemsResult( - streamRecord, - path, + streamItemQueue.push( + new BoxedPromiseOrValue( + getNextAsyncStreamItemResult( + streamItemQueue, + streamPath, index, asyncIterator, exeContext, @@ -2410,13 +2358,12 @@ async function getNextAsyncStreamItemsResult( itemType, ), ), - }; + ); - return prependNextStreamItems(result, nextStreamItems); + return result; } -function completeStreamItems( - streamRecord: StreamRecord, +function completeStreamItem( itemPath: Path, item: unknown, exeContext: ExecutionContext, @@ -2424,7 +2371,7 @@ function completeStreamItems( fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemType: GraphQLOutputType, -): PromiseOrValue { +): PromiseOrValue { if (isPromise(item)) { return completePromisedValue( exeContext, @@ -2437,13 +2384,8 @@ function completeStreamItems( new Map(), ).then( (resolvedItem) => - buildStreamItemsResult( - incrementalContext.errors, - streamRecord, - resolvedItem, - ), + buildStreamItemResult(incrementalContext.errors, resolvedItem), (error) => ({ - streamRecord, errors: withError(incrementalContext.errors, error), }), ); @@ -2475,7 +2417,6 @@ function completeStreamItems( } } catch (error) { return { - streamRecord, errors: withError(incrementalContext.errors, error), }; } @@ -2495,39 +2436,23 @@ function completeStreamItems( }) .then( (resolvedItem) => - buildStreamItemsResult( - incrementalContext.errors, - streamRecord, - resolvedItem, - ), + buildStreamItemResult(incrementalContext.errors, resolvedItem), (error) => ({ - streamRecord, errors: withError(incrementalContext.errors, error), }), ); } - return buildStreamItemsResult( - incrementalContext.errors, - streamRecord, - result, - ); + return buildStreamItemResult(incrementalContext.errors, result); } -function buildStreamItemsResult( +function buildStreamItemResult( errors: ReadonlyArray | undefined, - streamRecord: StreamRecord, result: GraphQLWrappedResult, -): StreamItemsResult { +): StreamItemResult { return { - streamRecord, - result: - errors === undefined - ? { items: [result[0]] } - : { - items: [result[0]], - errors: [...errors], - }, + item: result[0], + errors, incrementalDataRecords: result[1], }; } diff --git a/src/execution/types.ts b/src/execution/types.ts index 5c44e6dea8..9340ab1b85 100644 --- a/src/execution/types.ts +++ b/src/execution/types.ts @@ -217,10 +217,26 @@ export interface DeferredFragmentRecord { parent: DeferredFragmentRecord | undefined; } +export interface StreamItemResult { + item?: unknown; + incrementalDataRecords?: ReadonlyArray | undefined; + errors?: ReadonlyArray | undefined; +} + +export type StreamItemRecord = BoxedPromiseOrValue; + export interface StreamRecord { path: Path; label: string | undefined; id?: string | undefined; + streamItemQueue: Array; +} + +export interface StreamItemsResult { + streamRecord: StreamRecord; + result?: BareStreamItemsResult | undefined; + incrementalDataRecords?: ReadonlyArray | undefined; + errors?: ReadonlyArray | undefined; } export interface CancellableStreamRecord extends StreamRecord { @@ -233,45 +249,9 @@ export function isCancellableStreamRecord( return 'earlyReturn' in subsequentResultRecord; } -interface ReconcilableStreamItemsResult { - streamRecord: StreamRecord; - result: BareStreamItemsResult; - incrementalDataRecords: ReadonlyArray | undefined; - errors?: never; -} - -export function isReconcilableStreamItemsResult( - streamItemsResult: StreamItemsResult, -): streamItemsResult is ReconcilableStreamItemsResult { - return streamItemsResult.result !== undefined; -} - -interface TerminatingStreamItemsResult { - streamRecord: StreamRecord; - result?: never; - incrementalDataRecords?: never; - errors?: never; -} - -interface NonReconcilableStreamItemsResult { - streamRecord: StreamRecord; - errors: ReadonlyArray; - result?: never; -} - -export type StreamItemsResult = - | ReconcilableStreamItemsResult - | TerminatingStreamItemsResult - | NonReconcilableStreamItemsResult; - -export interface StreamItemsRecord { - streamRecord: StreamRecord; - result: BoxedPromiseOrValue; -} - export type IncrementalDataRecord = | DeferredGroupedFieldSetRecord - | StreamItemsRecord; + | StreamRecord; export type IncrementalDataRecordResult = | DeferredGroupedFieldSetResult