Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Subscriptions support #846

Merged
merged 12 commits into from
May 17, 2017
118 changes: 67 additions & 51 deletions src/execution/execute.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ import type {
* Namely, schema of the type system that is currently executing,
* and the fragments defined in the query document
*/
type ExecutionContext = {
export type ExecutionContext = {
schema: GraphQLSchema;
fragments: {[key: string]: FragmentDefinitionNode};
rootValue: mixed;
Expand Down Expand Up @@ -117,22 +117,6 @@ export function execute(
variableValues?: ?{[key: string]: mixed},
operationName?: ?string
): Promise<ExecutionResult> {
invariant(schema, 'Must provide schema');
invariant(document, 'Must provide document');
invariant(
schema instanceof GraphQLSchema,
'Schema must be an instance of GraphQLSchema. Also ensure that there are ' +
'not multiple versions of GraphQL installed in your node_modules directory.'
);

// Variables, if provided, must be an object.
invariant(
!variableValues || typeof variableValues === 'object',
'Variables must be provided as an Object where each property is a ' +
'variable value. Perhaps look to see if an unparsed JSON string ' +
'was provided.'
);

// If a valid context cannot be created due to incorrect arguments,
// this will throw an error.
const context = buildExecutionContext(
Expand Down Expand Up @@ -183,8 +167,11 @@ export function responsePathAsArray(
return flattened.reverse();
}


function addPath(prev: ResponsePath, key: string | number) {
/**
* Given a ResponsePath and a key, return a new ResponsePath containing the
* new key.
*/
export function addPath(prev: ResponsePath, key: string | number) {
return { prev, key };
}

Expand All @@ -194,14 +181,30 @@ function addPath(prev: ResponsePath, key: string | number) {
*
* Throws a GraphQLError if a valid execution context cannot be created.
*/
function buildExecutionContext(
export function buildExecutionContext(
schema: GraphQLSchema,
document: DocumentNode,
rootValue: mixed,
contextValue: mixed,
rawVariableValues: ?{[key: string]: mixed},
operationName: ?string
): ExecutionContext {
invariant(schema, 'Must provide schema');
invariant(document, 'Must provide document');
invariant(
schema instanceof GraphQLSchema,
'Schema must be an instance of GraphQLSchema. Also ensure that there are ' +
'not multiple versions of GraphQL installed in your node_modules directory.'
);

// Variables, if provided, must be an object.
invariant(
!rawVariableValues || typeof rawVariableValues === 'object',
'Variables must be provided as an Object where each property is a ' +
'variable value. Perhaps look to see if an unparsed JSON string ' +
'was provided.'
);

const errors: Array<GraphQLError> = [];
let operation: ?OperationDefinitionNode;
const fragments: {[name: string]: FragmentDefinitionNode} =
Expand Down Expand Up @@ -280,7 +283,7 @@ function executeOperation(
/**
* Extracts the root type of the operation from the schema.
*/
function getOperationRootType(
export function getOperationRootType(
schema: GraphQLSchema,
operation: OperationDefinitionNode
): GraphQLObjectType {
Expand Down Expand Up @@ -408,7 +411,7 @@ function executeFields(
* returns an Interface or Union type, the "runtime type" will be the actual
* Object type returned by that field.
*/
function collectFields(
export function collectFields(
exeContext: ExecutionContext,
runtimeType: GraphQLObjectType,
selectionSet: SelectionSetNode,
Expand Down Expand Up @@ -577,60 +580,68 @@ function resolveField(
return;
}

const returnType = fieldDef.type;
const resolveFn = fieldDef.resolve || defaultFieldResolver;

// The resolve function's optional third argument is a context value that
// is provided to every resolve function within an execution. It is commonly
// used to represent an authenticated user, or request-specific caches.
const context = exeContext.contextValue;

// The resolve function's optional fourth argument is a collection of
// information about the current execution state.
const info: GraphQLResolveInfo = {
fieldName,
const info = buildResolveInfo(
exeContext,
fieldDef,
fieldNodes,
returnType,
parentType,
path,
schema: exeContext.schema,
fragments: exeContext.fragments,
rootValue: exeContext.rootValue,
operation: exeContext.operation,
variableValues: exeContext.variableValues,
};
path
);

// Get the resolve function, regardless of if its result is normal
// or abrupt (error).
const result = resolveOrError(
const result = resolveFieldValueOrError(
exeContext,
fieldDef,
fieldNode,
fieldNodes,
resolveFn,
source,
context,
info
);

return completeValueCatchingError(
exeContext,
returnType,
fieldDef.type,
fieldNodes,
info,
path,
result
);
}

export function buildResolveInfo(
exeContext: ExecutionContext,
fieldDef: GraphQLField<*, *>,
fieldNodes: Array<FieldNode>,
parentType: GraphQLObjectType,
path: ResponsePath
): GraphQLResolveInfo {
// The resolve function's optional fourth argument is a collection of
// information about the current execution state.
return {
fieldName: fieldNodes[0].name.value,
fieldNodes,
returnType: fieldDef.type,
parentType,
path,
schema: exeContext.schema,
fragments: exeContext.fragments,
rootValue: exeContext.rootValue,
operation: exeContext.operation,
variableValues: exeContext.variableValues,
};
}

// Isolates the "ReturnOrAbrupt" behavior to not de-opt the `resolveField`
// function. Returns the result of resolveFn or the abrupt-return Error object.
function resolveOrError<TSource, TContext>(
export function resolveFieldValueOrError<TSource>(
exeContext: ExecutionContext,
fieldDef: GraphQLField<TSource, TContext>,
fieldNode: FieldNode,
resolveFn: GraphQLFieldResolver<TSource, TContext>,
fieldDef: GraphQLField<TSource, *>,
fieldNodes: Array<FieldNode>,
resolveFn: GraphQLFieldResolver<TSource, *>,
source: TSource,
context: TContext,
info: GraphQLResolveInfo
): Error | mixed {
try {
Expand All @@ -639,10 +650,15 @@ function resolveOrError<TSource, TContext>(
// TODO: find a way to memoize, in case this field is within a List type.
const args = getArgumentValues(
fieldDef,
fieldNode,
fieldNodes[0],
exeContext.variableValues
);

// The resolve function's optional third argument is a context value that
// is provided to every resolve function within an execution. It is commonly
// used to represent an authenticated user, or request-specific caches.
const context = exeContext.contextValue;

return resolveFn(source, args, context, info);
} catch (error) {
// Sometimes a non-error is thrown, wrap it as an Error for a
Expand Down Expand Up @@ -1178,7 +1194,7 @@ function getPromise<T>(value: Promise<T> | mixed): Promise<T> | void {
* added to the query type, but that would require mutating type
* definitions, which would cause issues.
*/
function getFieldDef(
export function getFieldDef(
schema: GraphQLSchema,
parentType: GraphQLObjectType,
fieldName: string
Expand Down
1 change: 1 addition & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ export type {
ExecutionResult,
} from './execution';

export { subscribe, createSubscriptionSourceEventStream } from './subscription';

// Validate GraphQL queries.
export {
Expand Down
64 changes: 64 additions & 0 deletions src/subscription/__tests__/eventEmitterAsyncIterator-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Copyright (c) 2017, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

import { expect } from 'chai';
import { describe, it } from 'mocha';

import EventEmitter from 'events';
import eventEmitterAsyncIterator from './eventEmitterAsyncIterator';

describe('eventEmitterAsyncIterator', () => {

it('subscribe async-iterator mock', async () => {
// Create an AsyncIterator from an EventEmitter
const emitter = new EventEmitter();
const iterator = eventEmitterAsyncIterator(emitter, 'publish');

// Queue up publishes
expect(emitter.emit('publish', 'Apple')).to.equal(true);
expect(emitter.emit('publish', 'Banana')).to.equal(true);

// Read payloads
expect(await iterator.next()).to.deep.equal(
{ done: false, value: 'Apple' }
);
expect(await iterator.next()).to.deep.equal(
{ done: false, value: 'Banana' }
);

// Read ahead
const i3 = iterator.next().then(x => x);
const i4 = iterator.next().then(x => x);

// Publish
expect(emitter.emit('publish', 'Coconut')).to.equal(true);
expect(emitter.emit('publish', 'Durian')).to.equal(true);

// Await out of order to get correct results
expect(await i4).to.deep.equal({ done: false, value: 'Durian'});
expect(await i3).to.deep.equal({ done: false, value: 'Coconut'});

// Read ahead
const i5 = iterator.next().then(x => x);

// Terminate emitter
await iterator.return();

// Publish is not caught after terminate
expect(emitter.emit('publish', 'Fig')).to.equal(false);

// Find that cancelled read-ahead got a "done" result
expect(await i5).to.deep.equal({ done: true, value: undefined });

// And next returns empty completion value
expect(await iterator.next()).to.deep.equal(
{ done: true, value: undefined }
);
});
});
72 changes: 72 additions & 0 deletions src/subscription/__tests__/eventEmitterAsyncIterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Copyright (c) 2017, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
* @flow
*/

import type EventEmitter from 'events';
import { $$asyncIterator } from 'iterall';

/**
* Create an AsyncIterator from an EventEmitter. Useful for mocking a
* PubSub system for tests.
*/
export default function eventEmitterAsyncIterator(
eventEmitter: EventEmitter,
eventName: string
): AsyncIterator<mixed> {
const pullQueue = [];
const pushQueue = [];
let listening = true;
eventEmitter.addListener(eventName, pushValue);

function pushValue(event) {
if (pullQueue.length !== 0) {
pullQueue.shift()({ value: event, done: false });
} else {
pushQueue.push(event);
}
}

function pullValue() {
return new Promise(resolve => {
if (pushQueue.length !== 0) {
resolve({ value: pushQueue.shift(), done: false });
} else {
pullQueue.push(resolve);
}
});
}

function emptyQueue() {
if (listening) {
listening = false;
eventEmitter.removeListener(eventName, pushValue);
pullQueue.forEach(resolve => resolve({ value: undefined, done: true }));
pullQueue.length = 0;
pushQueue.length = 0;
}
}

return {
next() {
return listening ? pullValue() : this.return();
},
return() {
emptyQueue();
return Promise.resolve({ value: undefined, done: true });
},
throw(error) {
emptyQueue();
return Promise.reject(error);
},
[$$asyncIterator]() {
return this;
},
};
}
Loading