diff --git a/src/execution/execute.js b/src/execution/execute.js index 67e4146746..1fc4e1bd36 100644 --- a/src/execution/execute.js +++ b/src/execution/execute.js @@ -80,7 +80,7 @@ import type { * Namely, schema of the type system that is currently executing, * and the fragments defined in the query document */ -type ExecutionContext = { +export type ExecutionContext = { schema: GraphQLSchema; fragments: {[key: string]: FragmentDefinitionNode}; rootValue: mixed; @@ -117,22 +117,6 @@ export function execute( variableValues?: ?{[key: string]: mixed}, operationName?: ?string ): Promise { - invariant(schema, 'Must provide schema'); - invariant(document, 'Must provide document'); - invariant( - schema instanceof GraphQLSchema, - 'Schema must be an instance of GraphQLSchema. Also ensure that there are ' + - 'not multiple versions of GraphQL installed in your node_modules directory.' - ); - - // Variables, if provided, must be an object. - invariant( - !variableValues || typeof variableValues === 'object', - 'Variables must be provided as an Object where each property is a ' + - 'variable value. Perhaps look to see if an unparsed JSON string ' + - 'was provided.' - ); - // If a valid context cannot be created due to incorrect arguments, // this will throw an error. const context = buildExecutionContext( @@ -183,8 +167,11 @@ export function responsePathAsArray( return flattened.reverse(); } - -function addPath(prev: ResponsePath, key: string | number) { +/** + * Given a ResponsePath and a key, return a new ResponsePath containing the + * new key. + */ +export function addPath(prev: ResponsePath, key: string | number) { return { prev, key }; } @@ -194,7 +181,7 @@ function addPath(prev: ResponsePath, key: string | number) { * * Throws a GraphQLError if a valid execution context cannot be created. */ -function buildExecutionContext( +export function buildExecutionContext( schema: GraphQLSchema, document: DocumentNode, rootValue: mixed, @@ -202,6 +189,22 @@ function buildExecutionContext( rawVariableValues: ?{[key: string]: mixed}, operationName: ?string ): ExecutionContext { + invariant(schema, 'Must provide schema'); + invariant(document, 'Must provide document'); + invariant( + schema instanceof GraphQLSchema, + 'Schema must be an instance of GraphQLSchema. Also ensure that there are ' + + 'not multiple versions of GraphQL installed in your node_modules directory.' + ); + + // Variables, if provided, must be an object. + invariant( + !rawVariableValues || typeof rawVariableValues === 'object', + 'Variables must be provided as an Object where each property is a ' + + 'variable value. Perhaps look to see if an unparsed JSON string ' + + 'was provided.' + ); + const errors: Array = []; let operation: ?OperationDefinitionNode; const fragments: {[name: string]: FragmentDefinitionNode} = @@ -280,7 +283,7 @@ function executeOperation( /** * Extracts the root type of the operation from the schema. */ -function getOperationRootType( +export function getOperationRootType( schema: GraphQLSchema, operation: OperationDefinitionNode ): GraphQLObjectType { @@ -408,7 +411,7 @@ function executeFields( * returns an Interface or Union type, the "runtime type" will be the actual * Object type returned by that field. */ -function collectFields( +export function collectFields( exeContext: ExecutionContext, runtimeType: GraphQLObjectType, selectionSet: SelectionSetNode, @@ -577,44 +580,30 @@ function resolveField( return; } - const returnType = fieldDef.type; const resolveFn = fieldDef.resolve || defaultFieldResolver; - // The resolve function's optional third argument is a context value that - // is provided to every resolve function within an execution. It is commonly - // used to represent an authenticated user, or request-specific caches. - const context = exeContext.contextValue; - - // The resolve function's optional fourth argument is a collection of - // information about the current execution state. - const info: GraphQLResolveInfo = { - fieldName, + const info = buildResolveInfo( + exeContext, + fieldDef, fieldNodes, - returnType, parentType, - path, - schema: exeContext.schema, - fragments: exeContext.fragments, - rootValue: exeContext.rootValue, - operation: exeContext.operation, - variableValues: exeContext.variableValues, - }; + path + ); // Get the resolve function, regardless of if its result is normal // or abrupt (error). - const result = resolveOrError( + const result = resolveFieldValueOrError( exeContext, fieldDef, - fieldNode, + fieldNodes, resolveFn, source, - context, info ); return completeValueCatchingError( exeContext, - returnType, + fieldDef.type, fieldNodes, info, path, @@ -622,15 +611,37 @@ function resolveField( ); } +export function buildResolveInfo( + exeContext: ExecutionContext, + fieldDef: GraphQLField<*, *>, + fieldNodes: Array, + parentType: GraphQLObjectType, + path: ResponsePath +): GraphQLResolveInfo { + // The resolve function's optional fourth argument is a collection of + // information about the current execution state. + return { + fieldName: fieldNodes[0].name.value, + fieldNodes, + returnType: fieldDef.type, + parentType, + path, + schema: exeContext.schema, + fragments: exeContext.fragments, + rootValue: exeContext.rootValue, + operation: exeContext.operation, + variableValues: exeContext.variableValues, + }; +} + // Isolates the "ReturnOrAbrupt" behavior to not de-opt the `resolveField` // function. Returns the result of resolveFn or the abrupt-return Error object. -function resolveOrError( +export function resolveFieldValueOrError( exeContext: ExecutionContext, - fieldDef: GraphQLField, - fieldNode: FieldNode, - resolveFn: GraphQLFieldResolver, + fieldDef: GraphQLField, + fieldNodes: Array, + resolveFn: GraphQLFieldResolver, source: TSource, - context: TContext, info: GraphQLResolveInfo ): Error | mixed { try { @@ -639,10 +650,15 @@ function resolveOrError( // TODO: find a way to memoize, in case this field is within a List type. const args = getArgumentValues( fieldDef, - fieldNode, + fieldNodes[0], exeContext.variableValues ); + // The resolve function's optional third argument is a context value that + // is provided to every resolve function within an execution. It is commonly + // used to represent an authenticated user, or request-specific caches. + const context = exeContext.contextValue; + return resolveFn(source, args, context, info); } catch (error) { // Sometimes a non-error is thrown, wrap it as an Error for a @@ -1178,7 +1194,7 @@ function getPromise(value: Promise | mixed): Promise | void { * added to the query type, but that would require mutating type * definitions, which would cause issues. */ -function getFieldDef( +export function getFieldDef( schema: GraphQLSchema, parentType: GraphQLObjectType, fieldName: string diff --git a/src/index.js b/src/index.js index 64952bbb0d..a031932503 100644 --- a/src/index.js +++ b/src/index.js @@ -243,6 +243,7 @@ export type { ExecutionResult, } from './execution'; +export { subscribe, createSubscriptionSourceEventStream } from './subscription'; // Validate GraphQL queries. export { diff --git a/src/subscription/__tests__/eventEmitterAsyncIterator-test.js b/src/subscription/__tests__/eventEmitterAsyncIterator-test.js new file mode 100644 index 0000000000..683e96142d --- /dev/null +++ b/src/subscription/__tests__/eventEmitterAsyncIterator-test.js @@ -0,0 +1,64 @@ +/** + * Copyright (c) 2017, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import EventEmitter from 'events'; +import eventEmitterAsyncIterator from './eventEmitterAsyncIterator'; + +describe('eventEmitterAsyncIterator', () => { + + it('subscribe async-iterator mock', async () => { + // Create an AsyncIterator from an EventEmitter + const emitter = new EventEmitter(); + const iterator = eventEmitterAsyncIterator(emitter, 'publish'); + + // Queue up publishes + expect(emitter.emit('publish', 'Apple')).to.equal(true); + expect(emitter.emit('publish', 'Banana')).to.equal(true); + + // Read payloads + expect(await iterator.next()).to.deep.equal( + { done: false, value: 'Apple' } + ); + expect(await iterator.next()).to.deep.equal( + { done: false, value: 'Banana' } + ); + + // Read ahead + const i3 = iterator.next().then(x => x); + const i4 = iterator.next().then(x => x); + + // Publish + expect(emitter.emit('publish', 'Coconut')).to.equal(true); + expect(emitter.emit('publish', 'Durian')).to.equal(true); + + // Await out of order to get correct results + expect(await i4).to.deep.equal({ done: false, value: 'Durian'}); + expect(await i3).to.deep.equal({ done: false, value: 'Coconut'}); + + // Read ahead + const i5 = iterator.next().then(x => x); + + // Terminate emitter + await iterator.return(); + + // Publish is not caught after terminate + expect(emitter.emit('publish', 'Fig')).to.equal(false); + + // Find that cancelled read-ahead got a "done" result + expect(await i5).to.deep.equal({ done: true, value: undefined }); + + // And next returns empty completion value + expect(await iterator.next()).to.deep.equal( + { done: true, value: undefined } + ); + }); +}); diff --git a/src/subscription/__tests__/eventEmitterAsyncIterator.js b/src/subscription/__tests__/eventEmitterAsyncIterator.js new file mode 100644 index 0000000000..8fe45a4ec3 --- /dev/null +++ b/src/subscription/__tests__/eventEmitterAsyncIterator.js @@ -0,0 +1,72 @@ +/** + * Copyright (c) 2017, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + * @flow + */ + +import type EventEmitter from 'events'; +import { $$asyncIterator } from 'iterall'; + +/** + * Create an AsyncIterator from an EventEmitter. Useful for mocking a + * PubSub system for tests. + */ +export default function eventEmitterAsyncIterator( + eventEmitter: EventEmitter, + eventName: string +): AsyncIterator { + const pullQueue = []; + const pushQueue = []; + let listening = true; + eventEmitter.addListener(eventName, pushValue); + + function pushValue(event) { + if (pullQueue.length !== 0) { + pullQueue.shift()({ value: event, done: false }); + } else { + pushQueue.push(event); + } + } + + function pullValue() { + return new Promise(resolve => { + if (pushQueue.length !== 0) { + resolve({ value: pushQueue.shift(), done: false }); + } else { + pullQueue.push(resolve); + } + }); + } + + function emptyQueue() { + if (listening) { + listening = false; + eventEmitter.removeListener(eventName, pushValue); + pullQueue.forEach(resolve => resolve({ value: undefined, done: true })); + pullQueue.length = 0; + pushQueue.length = 0; + } + } + + return { + next() { + return listening ? pullValue() : this.return(); + }, + return() { + emptyQueue(); + return Promise.resolve({ value: undefined, done: true }); + }, + throw(error) { + emptyQueue(); + return Promise.reject(error); + }, + [$$asyncIterator]() { + return this; + }, + }; +} diff --git a/src/subscription/__tests__/mapAsyncIterator-test.js b/src/subscription/__tests__/mapAsyncIterator-test.js new file mode 100644 index 0000000000..5f5946a358 --- /dev/null +++ b/src/subscription/__tests__/mapAsyncIterator-test.js @@ -0,0 +1,193 @@ +/** + * Copyright (c) 2015, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +import { expect } from 'chai'; +import { describe, it } from 'mocha'; +import mapAsyncIterator from '../mapAsyncIterator'; + +describe('mapAsyncIterator', () => { + + it('maps over async values', async () => { + async function* source() { + yield 1; + yield 2; + yield 3; + } + + const doubles = mapAsyncIterator(source(), x => x + x); + + expect( + await doubles.next() + ).to.deep.equal({ value: 2, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: 4, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: 6, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + }); + + it('maps over async values with async function', async () => { + async function* source() { + yield 1; + yield 2; + yield 3; + } + + const doubles = mapAsyncIterator(source(), async x => await x + x); + + expect( + await doubles.next() + ).to.deep.equal({ value: 2, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: 4, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: 6, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + }); + + it('allows returning early from async values', async () => { + async function* source() { + yield 1; + yield 2; + yield 3; + } + + const doubles = mapAsyncIterator(source(), x => x + x); + + expect( + await doubles.next() + ).to.deep.equal({ value: 2, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: 4, done: false }); + + // Early return + expect( + await doubles.return() + ).to.deep.equal({ value: undefined, done: true }); + + // Subsequent nexts + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + }); + + it('passes through early return from async values', async () => { + async function* source() { + try { + yield 1; + yield 2; + yield 3; + } finally { + yield 'done'; + yield 'last'; + } + } + + const doubles = mapAsyncIterator(source(), x => x + x); + + expect( + await doubles.next() + ).to.deep.equal({ value: 2, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: 4, done: false }); + + // Early return + expect( + await doubles.return() + ).to.deep.equal({ value: 'donedone', done: false }); + + // Subsequent nexts may yield from finally block + expect( + await doubles.next() + ).to.deep.equal({ value: 'lastlast', done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + }); + + it('allows throwing errors through async generators', async () => { + async function* source() { + yield 1; + yield 2; + yield 3; + } + + const doubles = mapAsyncIterator(source(), x => x + x); + + expect( + await doubles.next() + ).to.deep.equal({ value: 2, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: 4, done: false }); + + // Throw error + let caughtError; + try { + await doubles.throw('ouch'); + } catch (e) { + caughtError = e; + } + expect(caughtError).to.equal('ouch'); + + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + }); + + it('passes through caught errors through async generators', async () => { + async function* source() { + try { + yield 1; + yield 2; + yield 3; + } catch (e) { + yield e; + } + } + + const doubles = mapAsyncIterator(source(), x => x + x); + + expect( + await doubles.next() + ).to.deep.equal({ value: 2, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: 4, done: false }); + + // Throw error + expect( + await doubles.throw('ouch') + ).to.deep.equal({ value: 'ouchouch', done: false }); + + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + }); + +}); diff --git a/src/subscription/__tests__/subscribe-test.js b/src/subscription/__tests__/subscribe-test.js new file mode 100644 index 0000000000..a17c6fc530 --- /dev/null +++ b/src/subscription/__tests__/subscribe-test.js @@ -0,0 +1,588 @@ +/** + * Copyright (c) 2015, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +import { expect } from 'chai'; +import { describe, it } from 'mocha'; +import EventEmitter from 'events'; +import eventEmitterAsyncIterator from './eventEmitterAsyncIterator'; +import { subscribe } from '../subscribe'; +import { parse } from '../../language'; +import { + GraphQLSchema, + GraphQLObjectType, + GraphQLList, + GraphQLBoolean, + GraphQLInt, + GraphQLString, +} from '../../type'; + + +describe('Subscribe', () => { + + const EmailType = new GraphQLObjectType({ + name: 'Email', + fields: { + from: { type: GraphQLString }, + subject: { type: GraphQLString }, + message: { type: GraphQLString }, + unread: { type: GraphQLBoolean }, + } + }); + + const InboxType = new GraphQLObjectType({ + name: 'Inbox', + fields: { + total: { + type: GraphQLInt, + resolve: inbox => inbox.emails.length, + }, + unread: { + type: GraphQLInt, + resolve: inbox => inbox.emails.filter(email => email.unread).length, + }, + emails: { type: new GraphQLList(EmailType) }, + } + }); + + const QueryType = new GraphQLObjectType({ + name: 'Query', + fields: { + inbox: { type: InboxType }, + } + }); + + const EmailEventType = new GraphQLObjectType({ + name: 'EmailEvent', + fields: { + email: { type: EmailType }, + inbox: { type: InboxType }, + } + }); + + const SubscriptionType = new GraphQLObjectType({ + name: 'Subscription', + fields: { + importantEmail: { type: EmailEventType }, + } + }); + + const emailSchema = new GraphQLSchema({ + query: QueryType, + subscription: SubscriptionType + }); + + function createSubscription(pubsub, schema = emailSchema) { + const data = { + inbox: { + emails: [ + { + from: 'joe@graphql.org', + subject: 'Hello', + message: 'Hello World', + unread: false, + }, + ], + }, + importantEmail() { + return eventEmitterAsyncIterator(pubsub, 'importantEmail'); + } + }; + + function sendImportantEmail(newEmail) { + data.inbox.emails.push(newEmail); + // Returns true if the event was consumed by a subscriber. + return pubsub.emit('importantEmail', { + importantEmail: { + email: newEmail, + inbox: data.inbox, + } + }); + } + + const ast = parse(` + subscription ($priority: Int = 0) { + importantEmail(priority: $priority) { + email { + from + subject + } + inbox { + unread + total + } + } + } + `); + + // GraphQL `subscribe` has the same call signature as `execute`, but returns + // AsyncIterator instead of Promise. + return { + sendImportantEmail, + subscription: subscribe( + schema, + ast, + data, + null + ), + }; + } + + it('multiple subscription fields defined in schema', async () => { + const pubsub = new EventEmitter(); + const SubscriptionTypeMultiple = new GraphQLObjectType({ + name: 'Subscription', + fields: { + importantEmail: { type: EmailEventType }, + nonImportantEmail: { type: EmailEventType }, + } + }); + + const testSchema = new GraphQLSchema({ + query: QueryType, + subscription: SubscriptionTypeMultiple + }); + + expect(() => { + const { sendImportantEmail } = + createSubscription(pubsub, testSchema); + + sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + }); + }).not.to.throw(); + }); + + it('should throw when querying for multiple fields', async () => { + const SubscriptionTypeMultiple = new GraphQLObjectType({ + name: 'Subscription', + fields: { + importantEmail: { type: EmailEventType }, + nonImportantEmail: { type: EmailEventType }, + } + }); + + const testSchema = new GraphQLSchema({ + query: QueryType, + subscription: SubscriptionTypeMultiple + }); + + const ast = parse(` + subscription { + importantEmail + nonImportantEmail + } + `); + + expect(() => { + subscribe( + testSchema, + ast, + null, + null); + }).to.throw( + 'A subscription operation must contain exactly one root field.'); + }); + + it('produces payload for multiple subscribe in same subscription', + async () => { + const pubsub = new EventEmitter(); + const { sendImportantEmail, subscription } = createSubscription(pubsub); + const second = createSubscription(pubsub); + + const payload1 = subscription.next(); + const payload2 = second.subscription.next(); + + expect(sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + })).to.equal(true); + + const expectedPayload = { + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Alright', + }, + inbox: { + unread: 1, + total: 2, + }, + }, + }, + }, + }; + + expect(await payload1).to.deep.equal(expectedPayload); + expect(await payload2).to.deep.equal(expectedPayload); + }); + + it('produces a payload per subscription event', async () => { + const pubsub = new EventEmitter(); + const { sendImportantEmail, subscription } = createSubscription(pubsub); + + // Wait for the next subscription payload. + const payload = subscription.next(); + + // A new email arrives! + expect(sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + })).to.equal(true); + + // The previously waited on payload now has a value. + expect(await payload).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Alright', + }, + inbox: { + unread: 1, + total: 2, + }, + }, + }, + }, + }); + + // Another new email arrives, before subscription.next() is called. + expect(sendImportantEmail({ + from: 'hyo@graphql.org', + subject: 'Tools', + message: 'I <3 making things', + unread: true, + })).to.equal(true); + + // The next waited on payload will have a value. + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'hyo@graphql.org', + subject: 'Tools', + }, + inbox: { + unread: 2, + total: 3, + }, + }, + }, + }, + }); + + // The client decides to disconnect. + expect(await subscription.return()).to.deep.equal({ + done: true, + value: undefined, + }); + + // Which may result in disconnecting upstream services as well. + expect(sendImportantEmail({ + from: 'adam@graphql.org', + subject: 'Important', + message: 'Read me please', + unread: true, + })).to.equal(false); // No more listeners. + + // Awaiting a subscription after closing it results in completed results. + expect(await subscription.next()).to.deep.equal({ + done: true, + value: undefined, + }); + }); + + it('produces a payload when there are multiple events', async () => { + const pubsub = new EventEmitter(); + const { sendImportantEmail, subscription } = createSubscription(pubsub); + let payload = subscription.next(); + + // A new email arrives! + expect(sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + })).to.equal(true); + + expect(await payload).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Alright', + }, + inbox: { + unread: 1, + total: 2, + }, + }, + }, + }, + }); + + payload = subscription.next(); + + // A new email arrives! + expect(sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Alright 2', + message: 'Tests are good 2', + unread: true, + })).to.equal(true); + + expect(await payload).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Alright 2', + }, + inbox: { + unread: 2, + total: 3, + }, + }, + }, + }, + }); + }); + + it('should not trigger when subscription is already done', async () => { + const pubsub = new EventEmitter(); + const { sendImportantEmail, subscription } = createSubscription(pubsub); + let payload = subscription.next(); + + // A new email arrives! + expect(sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + })).to.equal(true); + + expect(await payload).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Alright', + }, + inbox: { + unread: 1, + total: 2, + }, + }, + }, + }, + }); + + payload = subscription.next(); + subscription.return(); + + // A new email arrives! + expect(sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Alright 2', + message: 'Tests are good 2', + unread: true, + })).to.equal(false); + + expect(await payload).to.deep.equal({ + done: true, + value: undefined, + }); + }); + + it('events order is correct when multiple triggered together', async () => { + const pubsub = new EventEmitter(); + const { sendImportantEmail, subscription } = createSubscription(pubsub); + let payload = subscription.next(); + + // A new email arrives! + expect(sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Message', + message: 'Tests are good', + unread: true, + })).to.equal(true); + + // A new email arrives! + expect(sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Message 2', + message: 'Tests are good 2', + unread: true, + })).to.equal(true); + + expect(await payload).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Message', + }, + inbox: { + unread: 2, + total: 3, + }, + }, + }, + }, + }); + + payload = subscription.next(); + + expect(await payload).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Message 2', + }, + inbox: { + unread: 2, + total: 3, + }, + }, + }, + }, + }); + }); + + it('invalid query should result in error', async () => { + const invalidAST = parse(` + subscription { + invalidField + } + `); + + expect(() => { + subscribe( + emailSchema, + invalidAST, + null, + null); + }).to.throw('This subscription is not defined by the schema.'); + }); + + it('throws when subscription definition doesnt return iterator', () => { + const invalidEmailSchema = new GraphQLSchema({ + query: QueryType, + subscription: new GraphQLObjectType({ + name: 'Subscription', + fields: { + importantEmail: { + type: GraphQLString, + subscribe: () => 'test', + }, + } + }) + }); + + const ast = parse(` + subscription { + importantEmail + } + `); + + expect(() => { + subscribe( + invalidEmailSchema, + ast, + null, + null); + }).to.throw('Subscription must return Async Iterable.'); + }); + + it('expects to have subscribe on type definition with iterator', () => { + const pubsub = new EventEmitter(); + const invalidEmailSchema = new GraphQLSchema({ + query: QueryType, + subscription: new GraphQLObjectType({ + name: 'Subscription', + fields: { + importantEmail: { + type: GraphQLString, + subscribe: () => eventEmitterAsyncIterator(pubsub, 'importantEmail') + }, + } + }) + }); + + const ast = parse(` + subscription { + importantEmail + } + `); + + expect(() => { + subscribe( + invalidEmailSchema, + ast, + null, + null); + }).not.to.throw(); + }); + + it('should handle error thrown by subscribe method', () => { + const invalidEmailSchema = new GraphQLSchema({ + query: QueryType, + subscription: new GraphQLObjectType({ + name: 'Subscription', + fields: { + importantEmail: { + type: GraphQLString, + subscribe: () => { + throw new Error('test error'); + }, + }, + }, + }) + }); + + const ast = parse(` + subscription { + importantEmail + } + `); + + expect(() => { + subscribe( + invalidEmailSchema, + ast, + null, + null); + }).to.throw('test error'); + }); +}); diff --git a/src/subscription/index.js b/src/subscription/index.js new file mode 100644 index 0000000000..a5c82511f3 --- /dev/null +++ b/src/subscription/index.js @@ -0,0 +1 @@ +export { subscribe, createSubscriptionSourceEventStream } from './subscribe'; diff --git a/src/subscription/mapAsyncIterator.js b/src/subscription/mapAsyncIterator.js new file mode 100644 index 0000000000..5a35dd196a --- /dev/null +++ b/src/subscription/mapAsyncIterator.js @@ -0,0 +1,54 @@ +/** + * Copyright (c) 2017, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + * @flow + */ + +import { $$asyncIterator, getAsyncIterator } from 'iterall'; + +/** + * Given an AsyncIterable and a callback function, return an AsyncIterator + * which produces values mapped via calling the callback function. + */ +export default function mapAsyncIterator( + iterable: AsyncIterable, + callback: (value: T) => U +): AsyncIterator { + // Fixes a temporary issue with Regenerator/Babel + // https://github.com/facebook/regenerator/pull/290 + const iterator = iterable.next ? (iterable: any) : getAsyncIterator(iterable); + + function mapResult(result) { + return result.done ? + result : + Promise.resolve(callback(result.value)).then( + mapped => ({ value: mapped, done: false }) + ); + } + + return { + next() { + return iterator.next().then(mapResult); + }, + return() { + if (typeof iterator.return === 'function') { + return iterator.return().then(mapResult); + } + return Promise.resolve({ value: undefined, done: true }); + }, + throw(error) { + if (typeof iterator.throw === 'function') { + return iterator.throw(error).then(mapResult); + } + return Promise.reject(error); + }, + [$$asyncIterator]() { + return this; + }, + }; +} diff --git a/src/subscription/subscribe.js b/src/subscription/subscribe.js new file mode 100644 index 0000000000..d44264bd21 --- /dev/null +++ b/src/subscription/subscribe.js @@ -0,0 +1,160 @@ +/** + * Copyright (c) 2017, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + * @flow + */ + +import { isAsyncIterable } from 'iterall'; +import { + addPath, + buildExecutionContext, + collectFields, + defaultFieldResolver, + execute, + getFieldDef, + getOperationRootType, + buildResolveInfo, + resolveFieldValueOrError, +} from '../execution/execute'; +import { GraphQLSchema } from '../type/schema'; +import invariant from '../jsutils/invariant'; +import mapAsyncIterator from './mapAsyncIterator'; + +import type { + ExecutionContext, + ExecutionResult, +} from '../execution/execute'; +import type { + DocumentNode, + OperationDefinitionNode, +} from '../language/ast'; + +export function createSubscriptionSourceEventStream( + schema: GraphQLSchema, + document: DocumentNode, + rootValue?: mixed, + contextValue?: mixed, + variableValues?: ?{[key: string]: mixed}, + operationName?: ?string, +): AsyncIterable { + // If a valid context cannot be created due to incorrect arguments, + // this will throw an error. + const exeContext = buildExecutionContext( + schema, + document, + rootValue, + contextValue, + variableValues, + operationName + ); + + // Call the `subscribe()` resolver or the default resolver to produce an + // AsyncIterable yielding raw payloads. + return resolveSubscription( + exeContext, + exeContext.operation, + rootValue + ); +} + +/** + * Implements the "Subscribing to request" section of the GraphQL specification. + * + * Returns an AsyncIterator + * + * If the arguments to this function do not result in a legal execution context, + * a GraphQLError will be thrown immediately explaining the invalid input. + */ +export function subscribe( + schema: GraphQLSchema, + document: DocumentNode, + rootValue?: mixed, + contextValue?: mixed, + variableValues?: ?{[key: string]: mixed}, + operationName?: ?string, +): AsyncIterator { + const subscription = createSubscriptionSourceEventStream( + schema, + document, + rootValue, + contextValue, + variableValues, + operationName); + + // For each payload yielded from a subscription, map it over the normal + // GraphQL `execute` function, with `payload` as the rootValue. + return mapAsyncIterator( + subscription, + payload => execute( + schema, + document, + payload, + contextValue, + variableValues, + operationName + ) + ); +} + +function resolveSubscription( + exeContext: ExecutionContext, + operation: OperationDefinitionNode, + rootValue: mixed +): AsyncIterable { + const type = getOperationRootType(exeContext.schema, exeContext.operation); + const fields = collectFields( + exeContext, + type, + exeContext.operation.selectionSet, + Object.create(null), + Object.create(null) + ); + const responseNames = Object.keys(fields); + invariant( + responseNames.length === 1, + 'A subscription operation must contain exactly one root field.' + ); + const responseName = responseNames[0]; + const fieldNodes = fields[responseName]; + const fieldNode = fieldNodes[0]; + const fieldDef = getFieldDef(exeContext.schema, type, fieldNode.name.value); + invariant( + fieldDef, + 'This subscription is not defined by the schema.' + ); + + const resolveFn = fieldDef.subscribe || defaultFieldResolver; + + const info = buildResolveInfo( + exeContext, + fieldDef, + fieldNodes, + type, + addPath(undefined, responseName) + ); + + const subscription = resolveFieldValueOrError( + exeContext, + fieldDef, + fieldNodes, + resolveFn, + rootValue, + info + ); + + if (subscription instanceof Error) { + throw subscription; + } + + invariant( + isAsyncIterable(subscription), + 'Subscription must return Async Iterable.' + ); + + return (subscription: any); +} diff --git a/src/type/definition.js b/src/type/definition.js index 1e79aeeced..4311b08ff9 100644 --- a/src/type/definition.js +++ b/src/type/definition.js @@ -614,6 +614,7 @@ export type GraphQLFieldConfig = { type: GraphQLOutputType; args?: GraphQLFieldConfigArgumentMap; resolve?: GraphQLFieldResolver; + subscribe?: GraphQLFieldResolver; deprecationReason?: ?string; description?: ?string; }; @@ -638,6 +639,7 @@ export type GraphQLField = { type: GraphQLOutputType; args: Array; resolve?: GraphQLFieldResolver; + subscribe?: GraphQLFieldResolver; isDeprecated?: boolean; deprecationReason?: ?string; };