From f05509e8c526ce44656389ab9997a6e5ee957a3d Mon Sep 17 00:00:00 2001 From: Daniel Lee Date: Wed, 4 Dec 2024 09:48:50 -0800 Subject: [PATCH] Support streaming streaming responses for callable functions. (#8609) The new .stream() API allows the client to consume streaming responses from the WIP streaming callable functions in Firebase Functions Node.js SDK. When client makes a request to the callable function w/ header Accept: text/event-stream, the callable function responds with response chunks in Server-Sent Event format. The sdk changes here abstracts over the wire-protocol by parsing the response chunks and returning an instance of a AsyncIterable to consume to data: import { getFunctions, httpsCallable } from "firebase/functions"; const functions = getFunctions(); const generateText = httpsCallable(functions, 'generateText'); const resp = await generateText.stream( { text: 'What is your favorite Firebase service and why?' }, { signal: AbortSignal.timeout(60_000) }, ); try { for await (const message of resp.stream) { console.log(message); // prints "foo", "bar" } console.log(await resp.data) // prints "foo bar" } catch (e) { // FirebaseError(code='cancelled', message='Request was cancelled.'); console.error(e) } --- .changeset/bright-scissors-care.md | 6 + common/api-review/functions.api.md | 25 +- config/.gitignore | 1 + docs-devsite/_toc.yaml | 6 + docs-devsite/functions.httpscallable.md | 33 ++ .../functions.httpscallableoptions.md | 4 +- .../functions.httpscallablestreamoptions.md | 46 ++ .../functions.httpscallablestreamresult.md | 42 ++ docs-devsite/functions.md | 22 +- packages/functions/package.json | 2 +- packages/functions/src/api.ts | 17 +- packages/functions/src/callable.test.ts | 393 +++++++++++++++++- packages/functions/src/public-types.ts | 48 ++- packages/functions/src/service.ts | 334 +++++++++++++-- 14 files changed, 918 insertions(+), 61 deletions(-) create mode 100644 .changeset/bright-scissors-care.md create mode 100644 config/.gitignore create mode 100644 docs-devsite/functions.httpscallable.md create mode 100644 docs-devsite/functions.httpscallablestreamoptions.md create mode 100644 docs-devsite/functions.httpscallablestreamresult.md diff --git a/.changeset/bright-scissors-care.md b/.changeset/bright-scissors-care.md new file mode 100644 index 00000000000..b97b22b7248 --- /dev/null +++ b/.changeset/bright-scissors-care.md @@ -0,0 +1,6 @@ +--- +'@firebase/functions': minor +'firebase': minor +--- + +Add `.stream()` api for callable functions for consuming streaming responses. diff --git a/common/api-review/functions.api.md b/common/api-review/functions.api.md index 6133e017f67..883bde3bc0d 100644 --- a/common/api-review/functions.api.md +++ b/common/api-review/functions.api.md @@ -35,13 +35,18 @@ export type FunctionsErrorCodeCore = 'ok' | 'cancelled' | 'unknown' | 'invalid-a export function getFunctions(app?: FirebaseApp, regionOrCustomDomain?: string): Functions; // @public -export type HttpsCallable = (data?: RequestData | null) => Promise>; +export interface HttpsCallable { + // (undocumented) + (data?: RequestData | null): Promise>; + // (undocumented) + stream: (data?: RequestData | null, options?: HttpsCallableStreamOptions) => Promise>; +} // @public -export function httpsCallable(functionsInstance: Functions, name: string, options?: HttpsCallableOptions): HttpsCallable; +export function httpsCallable(functionsInstance: Functions, name: string, options?: HttpsCallableOptions): HttpsCallable; // @public -export function httpsCallableFromURL(functionsInstance: Functions, url: string, options?: HttpsCallableOptions): HttpsCallable; +export function httpsCallableFromURL(functionsInstance: Functions, url: string, options?: HttpsCallableOptions): HttpsCallable; // @public export interface HttpsCallableOptions { @@ -54,5 +59,19 @@ export interface HttpsCallableResult { readonly data: ResponseData; } +// @public +export interface HttpsCallableStreamOptions { + limitedUseAppCheckTokens?: boolean; + signal?: AbortSignal; +} + +// @public +export interface HttpsCallableStreamResult { + // (undocumented) + readonly data: Promise; + // (undocumented) + readonly stream: AsyncIterable; +} + ``` diff --git a/config/.gitignore b/config/.gitignore new file mode 100644 index 00000000000..54b6cccf34d --- /dev/null +++ b/config/.gitignore @@ -0,0 +1 @@ +# Uncomment this if you'd like others to create their own Firebase project. diff --git a/docs-devsite/_toc.yaml b/docs-devsite/_toc.yaml index a27f2832eb7..9d60c12906c 100644 --- a/docs-devsite/_toc.yaml +++ b/docs-devsite/_toc.yaml @@ -375,10 +375,16 @@ toc: path: /docs/reference/js/functions.functions.md - title: FunctionsError path: /docs/reference/js/functions.functionserror.md + - title: HttpsCallable + path: /docs/reference/js/functions.httpscallable.md - title: HttpsCallableOptions path: /docs/reference/js/functions.httpscallableoptions.md - title: HttpsCallableResult path: /docs/reference/js/functions.httpscallableresult.md + - title: HttpsCallableStreamOptions + path: /docs/reference/js/functions.httpscallablestreamoptions.md + - title: HttpsCallableStreamResult + path: /docs/reference/js/functions.httpscallablestreamresult.md - title: installations path: /docs/reference/js/installations.md section: diff --git a/docs-devsite/functions.httpscallable.md b/docs-devsite/functions.httpscallable.md new file mode 100644 index 00000000000..3b9d70f3a05 --- /dev/null +++ b/docs-devsite/functions.httpscallable.md @@ -0,0 +1,33 @@ +Project: /docs/reference/js/_project.yaml +Book: /docs/reference/_book.yaml +page_type: reference + +{% comment %} +DO NOT EDIT THIS FILE! +This is generated by the JS SDK team, and any local changes will be +overwritten. Changes should be made in the source code at +https://github.com/firebase/firebase-js-sdk +{% endcomment %} + +# HttpsCallable interface +A reference to a "callable" HTTP trigger in Cloud Functions. + +Signature: + +```typescript +export interface HttpsCallable +``` + +## Properties + +| Property | Type | Description | +| --- | --- | --- | +| [stream](./functions.httpscallable.md#httpscallablestream) | (data?: RequestData \| null, options?: [HttpsCallableStreamOptions](./functions.httpscallablestreamoptions.md#httpscallablestreamoptions_interface)) => Promise<[HttpsCallableStreamResult](./functions.httpscallablestreamresult.md#httpscallablestreamresult_interface)<ResponseData, StreamData>> | | + +## HttpsCallable.stream + +Signature: + +```typescript +stream: (data?: RequestData | null, options?: HttpsCallableStreamOptions) => Promise>; +``` diff --git a/docs-devsite/functions.httpscallableoptions.md b/docs-devsite/functions.httpscallableoptions.md index b4a261918ce..22933a2f1f0 100644 --- a/docs-devsite/functions.httpscallableoptions.md +++ b/docs-devsite/functions.httpscallableoptions.md @@ -22,12 +22,12 @@ export interface HttpsCallableOptions | Property | Type | Description | | --- | --- | --- | -| [limitedUseAppCheckTokens](./functions.httpscallableoptions.md#httpscallableoptionslimiteduseappchecktokens) | boolean | If set to true, uses limited-use App Check token for callable function requests from this instance of [Functions](./functions.functions.md#functions_interface). You must use limited-use tokens to call functions with replay protection enabled. By default, this is false. | +| [limitedUseAppCheckTokens](./functions.httpscallableoptions.md#httpscallableoptionslimiteduseappchecktokens) | boolean | If set to true, uses a limited-use App Check token for callable function requests from this instance of [Functions](./functions.functions.md#functions_interface). You must use limited-use tokens to call functions with replay protection enabled. By default, this is false. | | [timeout](./functions.httpscallableoptions.md#httpscallableoptionstimeout) | number | Time in milliseconds after which to cancel if there is no response. Default is 70000. | ## HttpsCallableOptions.limitedUseAppCheckTokens -If set to true, uses limited-use App Check token for callable function requests from this instance of [Functions](./functions.functions.md#functions_interface). You must use limited-use tokens to call functions with replay protection enabled. By default, this is false. +If set to true, uses a limited-use App Check token for callable function requests from this instance of [Functions](./functions.functions.md#functions_interface). You must use limited-use tokens to call functions with replay protection enabled. By default, this is false. Signature: diff --git a/docs-devsite/functions.httpscallablestreamoptions.md b/docs-devsite/functions.httpscallablestreamoptions.md new file mode 100644 index 00000000000..6c790c7e0f9 --- /dev/null +++ b/docs-devsite/functions.httpscallablestreamoptions.md @@ -0,0 +1,46 @@ +Project: /docs/reference/js/_project.yaml +Book: /docs/reference/_book.yaml +page_type: reference + +{% comment %} +DO NOT EDIT THIS FILE! +This is generated by the JS SDK team, and any local changes will be +overwritten. Changes should be made in the source code at +https://github.com/firebase/firebase-js-sdk +{% endcomment %} + +# HttpsCallableStreamOptions interface +An interface for metadata about how a stream call should be executed. + +Signature: + +```typescript +export interface HttpsCallableStreamOptions +``` + +## Properties + +| Property | Type | Description | +| --- | --- | --- | +| [limitedUseAppCheckTokens](./functions.httpscallablestreamoptions.md#httpscallablestreamoptionslimiteduseappchecktokens) | boolean | If set to true, uses a limited-use App Check token for callable function requests from this instance of [Functions](./functions.functions.md#functions_interface). You must use limited-use tokens to call functions with replay protection enabled. By default, this is false. | +| [signal](./functions.httpscallablestreamoptions.md#httpscallablestreamoptionssignal) | AbortSignal | An AbortSignal that can be used to cancel the streaming response. When the signal is aborted, the underlying HTTP connection will be terminated. | + +## HttpsCallableStreamOptions.limitedUseAppCheckTokens + +If set to true, uses a limited-use App Check token for callable function requests from this instance of [Functions](./functions.functions.md#functions_interface). You must use limited-use tokens to call functions with replay protection enabled. By default, this is false. + +Signature: + +```typescript +limitedUseAppCheckTokens?: boolean; +``` + +## HttpsCallableStreamOptions.signal + +An `AbortSignal` that can be used to cancel the streaming response. When the signal is aborted, the underlying HTTP connection will be terminated. + +Signature: + +```typescript +signal?: AbortSignal; +``` diff --git a/docs-devsite/functions.httpscallablestreamresult.md b/docs-devsite/functions.httpscallablestreamresult.md new file mode 100644 index 00000000000..ba0d852041b --- /dev/null +++ b/docs-devsite/functions.httpscallablestreamresult.md @@ -0,0 +1,42 @@ +Project: /docs/reference/js/_project.yaml +Book: /docs/reference/_book.yaml +page_type: reference + +{% comment %} +DO NOT EDIT THIS FILE! +This is generated by the JS SDK team, and any local changes will be +overwritten. Changes should be made in the source code at +https://github.com/firebase/firebase-js-sdk +{% endcomment %} + +# HttpsCallableStreamResult interface +An `HttpsCallableStreamResult` wraps a single streaming result from a function call. + +Signature: + +```typescript +export interface HttpsCallableStreamResult +``` + +## Properties + +| Property | Type | Description | +| --- | --- | --- | +| [data](./functions.httpscallablestreamresult.md#httpscallablestreamresultdata) | Promise<ResponseData> | | +| [stream](./functions.httpscallablestreamresult.md#httpscallablestreamresultstream) | AsyncIterable<StreamData> | | + +## HttpsCallableStreamResult.data + +Signature: + +```typescript +readonly data: Promise; +``` + +## HttpsCallableStreamResult.stream + +Signature: + +```typescript +readonly stream: AsyncIterable; +``` diff --git a/docs-devsite/functions.md b/docs-devsite/functions.md index 4887fcd4911..7e2eefa1569 100644 --- a/docs-devsite/functions.md +++ b/docs-devsite/functions.md @@ -34,8 +34,11 @@ Cloud Functions for Firebase | Interface | Description | | --- | --- | | [Functions](./functions.functions.md#functions_interface) | A Functions instance. | +| [HttpsCallable](./functions.httpscallable.md#httpscallable_interface) | A reference to a "callable" HTTP trigger in Cloud Functions. | | [HttpsCallableOptions](./functions.httpscallableoptions.md#httpscallableoptions_interface) | An interface for metadata about how calls should be executed. | | [HttpsCallableResult](./functions.httpscallableresult.md#httpscallableresult_interface) | An HttpsCallableResult wraps a single result from a function call. | +| [HttpsCallableStreamOptions](./functions.httpscallablestreamoptions.md#httpscallablestreamoptions_interface) | An interface for metadata about how a stream call should be executed. | +| [HttpsCallableStreamResult](./functions.httpscallablestreamresult.md#httpscallablestreamresult_interface) | An HttpsCallableStreamResult wraps a single streaming result from a function call. | ## Type Aliases @@ -43,7 +46,6 @@ Cloud Functions for Firebase | --- | --- | | [FunctionsErrorCode](./functions.md#functionserrorcode) | The set of Firebase Functions status codes. The codes are the same at the ones exposed by gRPC here: https://github.com/grpc/grpc/blob/master/doc/statuscodes.mdPossible values: - 'cancelled': The operation was cancelled (typically by the caller). - 'unknown': Unknown error or an error from a different error domain. - 'invalid-argument': Client specified an invalid argument. Note that this differs from 'failed-precondition'. 'invalid-argument' indicates arguments that are problematic regardless of the state of the system (e.g. an invalid field name). - 'deadline-exceeded': Deadline expired before operation could complete. For operations that change the state of the system, this error may be returned even if the operation has completed successfully. For example, a successful response from a server could have been delayed long enough for the deadline to expire. - 'not-found': Some requested document was not found. - 'already-exists': Some document that we attempted to create already exists. - 'permission-denied': The caller does not have permission to execute the specified operation. - 'resource-exhausted': Some resource has been exhausted, perhaps a per-user quota, or perhaps the entire file system is out of space. - 'failed-precondition': Operation was rejected because the system is not in a state required for the operation's execution. - 'aborted': The operation was aborted, typically due to a concurrency issue like transaction aborts, etc. - 'out-of-range': Operation was attempted past the valid range. - 'unimplemented': Operation is not implemented or not supported/enabled. - 'internal': Internal errors. Means some invariants expected by underlying system has been broken. If you see one of these errors, something is very broken. - 'unavailable': The service is currently unavailable. This is most likely a transient condition and may be corrected by retrying with a backoff. - 'data-loss': Unrecoverable data loss or corruption. - 'unauthenticated': The request does not have valid authentication credentials for the operation. | | [FunctionsErrorCodeCore](./functions.md#functionserrorcodecore) | Functions error code string appended after "functions/" product prefix. See [FunctionsErrorCode](./functions.md#functionserrorcode) for full documentation of codes. | -| [HttpsCallable](./functions.md#httpscallable) | A reference to a "callable" HTTP trigger in Google Cloud Functions. | ## function(app, ...) @@ -101,7 +103,7 @@ Returns a reference to the callable HTTPS trigger with the given name. Signature: ```typescript -export declare function httpsCallable(functionsInstance: Functions, name: string, options?: HttpsCallableOptions): HttpsCallable; +export declare function httpsCallable(functionsInstance: Functions, name: string, options?: HttpsCallableOptions): HttpsCallable; ``` #### Parameters @@ -114,7 +116,7 @@ export declare function httpsCallableReturns: -[HttpsCallable](./functions.md#httpscallable)<RequestData, ResponseData> +[HttpsCallable](./functions.httpscallable.md#httpscallable_interface)<RequestData, ResponseData, StreamData> ### httpsCallableFromURL(functionsInstance, url, options) {:#httpscallablefromurl_7af6987} @@ -123,7 +125,7 @@ Returns a reference to the callable HTTPS trigger with the specified url. Signature: ```typescript -export declare function httpsCallableFromURL(functionsInstance: Functions, url: string, options?: HttpsCallableOptions): HttpsCallable; +export declare function httpsCallableFromURL(functionsInstance: Functions, url: string, options?: HttpsCallableOptions): HttpsCallable; ``` #### Parameters @@ -136,7 +138,7 @@ export declare function httpsCallableFromURLReturns: -[HttpsCallable](./functions.md#httpscallable)<RequestData, ResponseData> +[HttpsCallable](./functions.httpscallable.md#httpscallable_interface)<RequestData, ResponseData, StreamData> ## FunctionsErrorCode @@ -159,13 +161,3 @@ Functions error code string appended after "functions/" product prefix. See [Fun ```typescript export type FunctionsErrorCodeCore = 'ok' | 'cancelled' | 'unknown' | 'invalid-argument' | 'deadline-exceeded' | 'not-found' | 'already-exists' | 'permission-denied' | 'resource-exhausted' | 'failed-precondition' | 'aborted' | 'out-of-range' | 'unimplemented' | 'internal' | 'unavailable' | 'data-loss' | 'unauthenticated'; ``` - -## HttpsCallable - -A reference to a "callable" HTTP trigger in Google Cloud Functions. - -Signature: - -```typescript -export type HttpsCallable = (data?: RequestData | null) => Promise>; -``` diff --git a/packages/functions/package.json b/packages/functions/package.json index 62ab7ff0000..cc53d9120fb 100644 --- a/packages/functions/package.json +++ b/packages/functions/package.json @@ -37,7 +37,7 @@ "test:browser": "karma start", "test:browser:debug": "karma start --browsers=Chrome --auto-watch", "test:node": "TS_NODE_COMPILER_OPTIONS='{\"module\":\"commonjs\"}' nyc --reporter lcovonly -- mocha 'src/{,!(browser)/**/}*.test.ts' --file src/index.ts --config ../../config/mocharc.node.js", - "test:emulator": "env FIREBASE_FUNCTIONS_EMULATOR_ORIGIN=http://localhost:5005 run-p --npm-path npm test:node", + "test:emulator": "env FIREBASE_FUNCTIONS_EMULATOR_ORIGIN=http://127.0.0.1:5005 run-p --npm-path npm test:node", "trusted-type-check": "tsec -p tsconfig.json --noEmit", "api-report": "api-extractor run --local --verbose", "doc": "api-documenter markdown --input temp --output docs", diff --git a/packages/functions/src/api.ts b/packages/functions/src/api.ts index a7804c2f573..7f92cba8343 100644 --- a/packages/functions/src/api.ts +++ b/packages/functions/src/api.ts @@ -88,12 +88,16 @@ export function connectFunctionsEmulator( * @param name - The name of the trigger. * @public */ -export function httpsCallable( +export function httpsCallable< + RequestData = unknown, + ResponseData = unknown, + StreamData = unknown +>( functionsInstance: Functions, name: string, options?: HttpsCallableOptions -): HttpsCallable { - return _httpsCallable( +): HttpsCallable { + return _httpsCallable( getModularInstance(functionsInstance as FunctionsService), name, options @@ -107,13 +111,14 @@ export function httpsCallable( */ export function httpsCallableFromURL< RequestData = unknown, - ResponseData = unknown + ResponseData = unknown, + StreamData = unknown >( functionsInstance: Functions, url: string, options?: HttpsCallableOptions -): HttpsCallable { - return _httpsCallableFromURL( +): HttpsCallable { + return _httpsCallableFromURL( getModularInstance(functionsInstance as FunctionsService), url, options diff --git a/packages/functions/src/callable.test.ts b/packages/functions/src/callable.test.ts index 439e7d4f154..b969304c89e 100644 --- a/packages/functions/src/callable.test.ts +++ b/packages/functions/src/callable.test.ts @@ -37,7 +37,7 @@ import { AppCheckInternalComponentName } from '@firebase/app-check-interop-types'; import { makeFakeApp, createTestService } from '../test/utils'; -import { httpsCallable } from './service'; +import { FunctionsService, httpsCallable } from './service'; import { FUNCTIONS_TYPE } from './constants'; import { FunctionsError } from './error'; @@ -299,3 +299,394 @@ describe('Firebase Functions > Call', () => { await expectError(func(), 'deadline-exceeded', 'deadline-exceeded'); }); }); + +describe('Firebase Functions > Stream', () => { + let app: FirebaseApp; + let functions: FunctionsService; + let mockFetch: sinon.SinonStub; + const region = 'us-central1'; + + beforeEach(() => { + const useEmulator = !!process.env.FIREBASE_FUNCTIONS_EMULATOR_ORIGIN; + const projectId = useEmulator + ? 'functions-integration-test' + : TEST_PROJECT.projectId; + const messagingSenderId = 'messaging-sender-id'; + app = makeFakeApp({ projectId, messagingSenderId }); + functions = createTestService(app, region); + mockFetch = sinon.stub(functions, 'fetchImpl' as any); + }); + + afterEach(() => { + mockFetch.restore(); + }); + + it('successfully streams data and resolves final result', async () => { + const mockResponse = new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode('data: {"message":"Hello"}\n') + ); + controller.enqueue( + new TextEncoder().encode('data: {"message":"World"}\n') + ); + controller.enqueue( + new TextEncoder().encode('data: {"result":"Final Result"}\n') + ); + controller.close(); + } + }); + + mockFetch.resolves({ + body: mockResponse, + headers: new Headers({ 'Content-Type': 'text/event-stream' }), + status: 200, + statusText: 'OK' + } as Response); + + const func = httpsCallable, string, string>( + functions, + 'streamTest' + ); + const streamResult = await func.stream({}); + + const messages: string[] = []; + for await (const message of streamResult.stream) { + messages.push(message); + } + + expect(messages).to.deep.equal(['Hello', 'World']); + expect(await streamResult.data).to.equal('Final Result'); + }); + + it('successfully process request chunk with multiple events', async () => { + const mockResponse = new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode( + 'data: {"message":"Hello"}\n\ndata: {"message":"World"}\n' + ) + ); + controller.enqueue( + new TextEncoder().encode('data: {"result":"Final Result"}\n') + ); + controller.close(); + } + }); + + mockFetch.resolves({ + body: mockResponse, + headers: new Headers({ 'Content-Type': 'text/event-stream' }), + status: 200, + statusText: 'OK' + } as Response); + + const func = httpsCallable, string, string>( + functions, + 'streamTest' + ); + const streamResult = await func.stream({}); + + const messages: string[] = []; + for await (const message of streamResult.stream) { + messages.push(message); + } + + expect(messages).to.deep.equal(['Hello', 'World']); + expect(await streamResult.data).to.equal('Final Result'); + }); + + it('handles network errors', async () => { + mockFetch.rejects(new Error('Network error')); + + const func = httpsCallable, string, string>( + functions, + 'errTest' + ); + const streamResult = await func.stream({}); + + let errorThrown = false; + try { + for await (const _ of streamResult.stream) { + // This should not execute + } + } catch (error: unknown) { + errorThrown = true; + expect((error as FunctionsError).code).to.equal( + `${FUNCTIONS_TYPE}/internal` + ); + } + expect(errorThrown).to.be.true; + await expectError(streamResult.data, 'internal', 'internal'); + }); + + it('handles server-side errors', async () => { + const mockResponse = new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode( + 'data: {"error":{"status":"INVALID_ARGUMENT","message":"Invalid input"}}\n' + ) + ); + controller.close(); + } + }); + + mockFetch.resolves({ + body: mockResponse, + headers: new Headers({ 'Content-Type': 'text/event-stream' }), + status: 200, + statusText: 'OK' + } as Response); + + const func = httpsCallable, string, string>( + functions, + 'stream' + ); + const streamResult = await func.stream({}); + + let errorThrown = false; + try { + for await (const _ of streamResult.stream) { + // This should not execute + } + } catch (error) { + errorThrown = true; + expect((error as FunctionsError).code).to.equal( + `${FUNCTIONS_TYPE}/invalid-argument` + ); + expect((error as FunctionsError).message).to.equal('Invalid input'); + } + + expect(errorThrown).to.be.true; + await expectError(streamResult.data, 'invalid-argument', 'Invalid input'); + }); + + it('includes authentication and app check tokens in request headers', async () => { + const authMock: FirebaseAuthInternal = { + getToken: async () => ({ accessToken: 'auth-token' }) + } as unknown as FirebaseAuthInternal; + const authProvider = new Provider( + 'auth-internal', + new ComponentContainer('test') + ); + authProvider.setComponent( + new Component('auth-internal', () => authMock, ComponentType.PRIVATE) + ); + const appCheckMock: FirebaseAppCheckInternal = { + getToken: async () => ({ token: 'app-check-token' }) + } as unknown as FirebaseAppCheckInternal; + const appCheckProvider = new Provider( + 'app-check-internal', + new ComponentContainer('test') + ); + appCheckProvider.setComponent( + new Component( + 'app-check-internal', + () => appCheckMock, + ComponentType.PRIVATE + ) + ); + + const functions = createTestService( + app, + region, + authProvider, + undefined, + appCheckProvider + ); + const mockFetch = sinon.stub(functions, 'fetchImpl' as any); + + const mockResponse = new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode('data: {"result":"Success"}\n') + ); + controller.close(); + } + }); + + mockFetch.resolves({ + body: mockResponse, + headers: new Headers({ 'Content-Type': 'text/event-stream' }), + status: 200, + statusText: 'OK' + } as Response); + + const func = httpsCallable, string, string>( + functions, + 'stream' + ); + await func.stream({}); + + expect(mockFetch.calledOnce).to.be.true; + const [_, options] = mockFetch.firstCall.args; + expect(options.headers['Authorization']).to.equal('Bearer auth-token'); + expect(options.headers['Content-Type']).to.equal('application/json'); + expect(options.headers['Accept']).to.equal('text/event-stream'); + }); + + it('aborts during initial fetch', async () => { + const controller = new AbortController(); + + // Create a fetch that rejects when aborted + const fetchPromise = new Promise((_, reject) => { + controller.signal.addEventListener('abort', () => { + const error = new Error('The operation was aborted'); + error.name = 'AbortError'; + reject(error); + }); + }); + mockFetch.returns(fetchPromise); + + const func = httpsCallable, string, string>( + functions, + 'streamTest' + ); + const streamPromise = func.stream({}, { signal: controller.signal }); + + controller.abort(); + + const streamResult = await streamPromise; + + // Verify fetch was called with abort signal + expect(mockFetch.calledOnce).to.be.true; + const [_, options] = mockFetch.firstCall.args; + expect(options.signal).to.equal(controller.signal); + + // Verify stream iteration throws AbortError + let errorThrown = false; + try { + for await (const _ of streamResult.stream) { + // Should not execute + } + } catch (error) { + errorThrown = true; + expect((error as FunctionsError).code).to.equal( + `${FUNCTIONS_TYPE}/cancelled` + ); + } + expect(errorThrown).to.be.true; + await expectError(streamResult.data, 'cancelled', 'Request was cancelled.'); + }); + + it('aborts during streaming', async () => { + const controller = new AbortController(); + + const mockResponse = new ReadableStream({ + async start(controller) { + controller.enqueue( + new TextEncoder().encode('data: {"message":"First"}\n') + ); + // Add delay to simulate network latency + await new Promise(resolve => setTimeout(resolve, 50)); + controller.enqueue( + new TextEncoder().encode('data: {"message":"Second"}\n') + ); + await new Promise(resolve => setTimeout(resolve, 50)); + controller.enqueue( + new TextEncoder().encode('data: {"result":"Final"}\n') + ); + controller.close(); + } + }); + + mockFetch.resolves({ + body: mockResponse, + headers: new Headers({ 'Content-Type': 'text/event-stream' }), + status: 200, + statusText: 'OK' + } as Response); + + const func = httpsCallable, string, string>( + functions, + 'streamTest' + ); + const streamResult = await func.stream({}, { signal: controller.signal }); + + const messages: string[] = []; + try { + for await (const message of streamResult.stream) { + messages.push(message); + if (messages.length === 1) { + // Abort after receiving first message + controller.abort(); + } + } + throw new Error('Stream should have been aborted'); + } catch (error) { + expect((error as FunctionsError).code).to.equal( + `${FUNCTIONS_TYPE}/cancelled` + ); + } + expect(messages).to.deep.equal(['First']); + await expectError(streamResult.data, 'cancelled', 'Request was cancelled.'); + }); + + it('fails immediately with pre-aborted signal', async () => { + mockFetch.callsFake((url: string, options: RequestInit) => { + if (options.signal?.aborted) { + const error = new Error('The operation was aborted'); + error.name = 'AbortError'; + return Promise.reject(error); + } + return Promise.resolve(new Response()); + }); + const func = httpsCallable, string, string>( + functions, + 'streamTest' + ); + const streamResult = await func.stream({}, { signal: AbortSignal.abort() }); + + let errorThrown = false; + try { + for await (const _ of streamResult.stream) { + // Should not execute + } + } catch (error) { + errorThrown = true; + expect((error as FunctionsError).code).to.equal( + `${FUNCTIONS_TYPE}/cancelled` + ); + } + expect(errorThrown).to.be.true; + await expectError(streamResult.data, 'cancelled', 'Request was cancelled.'); + }); + + it('properly handles AbortSignal.timeout()', async () => { + const timeoutMs = 50; + const signal = AbortSignal.timeout(timeoutMs); + + mockFetch.callsFake(async (url: string, options: RequestInit) => { + await new Promise((resolve, reject) => { + options.signal?.addEventListener('abort', () => { + const error = new Error('The operation was aborted'); + error.name = 'AbortError'; + reject(error); + }); + setTimeout(resolve, timeoutMs * 3); + }); + + // If we get here, timeout didn't occur + return new Response(); + }); + + const func = httpsCallable, string, string>( + functions, + 'streamTest' + ); + const streamResult = await func.stream({}, { signal }); + + try { + for await (const _ of streamResult.stream) { + // Should not execute + } + throw new Error('Stream should have timed out'); + } catch (error) { + expect((error as FunctionsError).code).to.equal( + `${FUNCTIONS_TYPE}/cancelled` + ); + } + await expectError(streamResult.data, 'cancelled', 'Request was cancelled.'); + }); +}); diff --git a/packages/functions/src/public-types.ts b/packages/functions/src/public-types.ts index 311493d5fda..50b2d9a9e0c 100644 --- a/packages/functions/src/public-types.ts +++ b/packages/functions/src/public-types.ts @@ -28,13 +28,33 @@ export interface HttpsCallableResult { } /** - * A reference to a "callable" HTTP trigger in Google Cloud Functions. + * An `HttpsCallableStreamResult` wraps a single streaming result from a function call. + * @public + */ +export interface HttpsCallableStreamResult< + ResponseData = unknown, + StreamData = unknown +> { + readonly data: Promise; + readonly stream: AsyncIterable; +} + +/** + * A reference to a "callable" HTTP trigger in Cloud Functions. * @param data - Data to be passed to callable function. * @public */ -export type HttpsCallable = ( - data?: RequestData | null -) => Promise>; +export interface HttpsCallable< + RequestData = unknown, + ResponseData = unknown, + StreamData = unknown +> { + (data?: RequestData | null): Promise>; + stream: ( + data?: RequestData | null, + options?: HttpsCallableStreamOptions + ) => Promise>; +} /** * An interface for metadata about how calls should be executed. @@ -47,7 +67,25 @@ export interface HttpsCallableOptions { */ timeout?: number; /** - * If set to true, uses limited-use App Check token for callable function requests from this + * If set to true, uses a limited-use App Check token for callable function requests from this + * instance of {@link Functions}. You must use limited-use tokens to call functions with + * replay protection enabled. By default, this is false. + */ + limitedUseAppCheckTokens?: boolean; +} + +/** + * An interface for metadata about how a stream call should be executed. + * @public + */ +export interface HttpsCallableStreamOptions { + /** + * An `AbortSignal` that can be used to cancel the streaming response. When the signal is aborted, + * the underlying HTTP connection will be terminated. + */ + signal?: AbortSignal; + /** + * If set to true, uses a limited-use App Check token for callable function requests from this * instance of {@link Functions}. You must use limited-use tokens to call functions with * replay protection enabled. By default, this is false. */ diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index 986dcbc735d..ec459472b5a 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -19,7 +19,9 @@ import { FirebaseApp, _FirebaseService } from '@firebase/app'; import { HttpsCallable, HttpsCallableResult, - HttpsCallableOptions + HttpsCallableStreamResult, + HttpsCallableOptions, + HttpsCallableStreamOptions } from './public-types'; import { _errorForResponse, FunctionsError } from './error'; import { ContextProvider } from './context'; @@ -31,6 +33,8 @@ import { AppCheckInternalComponentName } from '@firebase/app-check-interop-types export const DEFAULT_REGION = 'us-central1'; +const responseLineRE = /^data: (.*?)(?:\n|$)/; + /** * The response to an http request. */ @@ -104,7 +108,8 @@ export class FunctionsService implements _FirebaseService { authProvider: Provider, messagingProvider: Provider, appCheckProvider: Provider, - regionOrCustomDomain: string = DEFAULT_REGION + regionOrCustomDomain: string = DEFAULT_REGION, + readonly fetchImpl: typeof fetch = (...args) => fetch(...args) ) { this.contextProvider = new ContextProvider( authProvider, @@ -176,14 +181,25 @@ export function connectFunctionsEmulator( * @param name - The name of the trigger. * @public */ -export function httpsCallable( +export function httpsCallable( functionsInstance: FunctionsService, name: string, options?: HttpsCallableOptions -): HttpsCallable { - return (data => { +): HttpsCallable { + const callable = ( + data?: RequestData | null + ): Promise => { return call(functionsInstance, name, data, options || {}); - }) as HttpsCallable; + }; + + callable.stream = ( + data?: RequestData | null, + options?: HttpsCallableStreamOptions + ) => { + return stream(functionsInstance, name, data, options); + }; + + return callable as HttpsCallable; } /** @@ -191,14 +207,28 @@ export function httpsCallable( * @param url - The url of the trigger. * @public */ -export function httpsCallableFromURL( +export function httpsCallableFromURL< + RequestData, + ResponseData, + StreamData = unknown +>( functionsInstance: FunctionsService, url: string, options?: HttpsCallableOptions -): HttpsCallable { - return (data => { +): HttpsCallable { + const callable = ( + data?: RequestData | null + ): Promise => { return callAtURL(functionsInstance, url, data, options || {}); - }) as HttpsCallable; + }; + + callable.stream = ( + data?: RequestData | null, + options?: HttpsCallableStreamOptions + ) => { + return streamAtURL(functionsInstance, url, data, options || {}); + }; + return callable as HttpsCallable; } /** @@ -211,13 +241,14 @@ export function httpsCallableFromURL( async function postJSON( url: string, body: unknown, - headers: { [key: string]: string } + headers: { [key: string]: string }, + fetchImpl: typeof fetch ): Promise { headers['Content-Type'] = 'application/json'; let response: Response; try { - response = await fetch(url, { + response = await fetchImpl(url, { method: 'POST', body: JSON.stringify(body), headers @@ -244,10 +275,36 @@ async function postJSON( }; } +/** + * Creates authorization headers for Firebase Functions requests. + * @param functionsInstance The Firebase Functions service instance. + * @param options Options for the callable function, including AppCheck token settings. + * @return A Promise that resolves a headers map to include in outgoing fetch request. + */ +async function makeAuthHeaders( + functionsInstance: FunctionsService, + options: HttpsCallableOptions +): Promise> { + const headers: Record = {}; + const context = await functionsInstance.contextProvider.getContext( + options.limitedUseAppCheckTokens + ); + if (context.authToken) { + headers['Authorization'] = 'Bearer ' + context.authToken; + } + if (context.messagingToken) { + headers['Firebase-Instance-ID-Token'] = context.messagingToken; + } + if (context.appCheckToken !== null) { + headers['X-Firebase-AppCheck'] = context.appCheckToken; + } + return headers; +} + /** * Calls a callable function asynchronously and returns the result. * @param name The name of the callable trigger. - * @param data The data to pass as params to the function.s + * @param data The data to pass as params to the function. */ function call( functionsInstance: FunctionsService, @@ -262,7 +319,7 @@ function call( /** * Calls a callable function asynchronously and returns the result. * @param url The url of the callable trigger. - * @param data The data to pass as params to the function.s + * @param data The data to pass as params to the function. */ async function callAtURL( functionsInstance: FunctionsService, @@ -275,26 +332,14 @@ async function callAtURL( const body = { data }; // Add a header for the authToken. - const headers: { [key: string]: string } = {}; - const context = await functionsInstance.contextProvider.getContext( - options.limitedUseAppCheckTokens - ); - if (context.authToken) { - headers['Authorization'] = 'Bearer ' + context.authToken; - } - if (context.messagingToken) { - headers['Firebase-Instance-ID-Token'] = context.messagingToken; - } - if (context.appCheckToken !== null) { - headers['X-Firebase-AppCheck'] = context.appCheckToken; - } + const headers = await makeAuthHeaders(functionsInstance, options); // Default timeout to 70s, but let the options override it. const timeout = options.timeout || 70000; const failAfterHandle = failAfter(timeout); const response = await Promise.race([ - postJSON(url, body, headers), + postJSON(url, body, headers, functionsInstance.fetchImpl), failAfterHandle.promise, functionsInstance.cancelAllRequests ]); @@ -336,3 +381,236 @@ async function callAtURL( return { data: decodedData }; } + +/** + * Calls a callable function asynchronously and returns a streaming result. + * @param name The name of the callable trigger. + * @param data The data to pass as params to the function. + * @param options Streaming request options. + */ +function stream( + functionsInstance: FunctionsService, + name: string, + data: unknown, + options?: HttpsCallableStreamOptions +): Promise { + const url = functionsInstance._url(name); + return streamAtURL(functionsInstance, url, data, options || {}); +} + +/** + * Calls a callable function asynchronously and return a streaming result. + * @param url The url of the callable trigger. + * @param data The data to pass as params to the function. + * @param options Streaming request options. + */ +async function streamAtURL( + functionsInstance: FunctionsService, + url: string, + data: unknown, + options: HttpsCallableStreamOptions +): Promise { + // Encode any special types, such as dates, in the input data. + data = encode(data); + const body = { data }; + // + // Add a header for the authToken. + const headers = await makeAuthHeaders(functionsInstance, options); + headers['Content-Type'] = 'application/json'; + headers['Accept'] = 'text/event-stream'; + + let response: Response; + try { + response = await functionsInstance.fetchImpl(url, { + method: 'POST', + body: JSON.stringify(body), + headers, + signal: options?.signal + }); + } catch (e) { + if (e instanceof Error && e.name === 'AbortError') { + const error = new FunctionsError('cancelled', 'Request was cancelled.'); + return { + data: Promise.reject(error), + stream: { + [Symbol.asyncIterator]() { + return { + next() { + return Promise.reject(error); + } + }; + } + } + }; + } + // This could be an unhandled error on the backend, or it could be a + // network error. There's no way to know, since an unhandled error on the + // backend will fail to set the proper CORS header, and thus will be + // treated as a network error by fetch. + const error = _errorForResponse(0, null); + return { + data: Promise.reject(error), + // Return an empty async iterator + stream: { + [Symbol.asyncIterator]() { + return { + next() { + return Promise.reject(error); + } + }; + } + } + }; + } + let resultResolver: (value: unknown) => void; + let resultRejecter: (reason: unknown) => void; + const resultPromise = new Promise((resolve, reject) => { + resultResolver = resolve; + resultRejecter = reject; + }); + options?.signal?.addEventListener('abort', () => { + const error = new FunctionsError('cancelled', 'Request was cancelled.'); + resultRejecter(error); + }); + const reader = response.body!.getReader(); + const rstream = createResponseStream( + reader, + resultResolver!, + resultRejecter!, + options?.signal + ); + return { + stream: { + [Symbol.asyncIterator]() { + const rreader = rstream.getReader(); + return { + async next() { + const { value, done } = await rreader.read(); + return { value: value as unknown, done }; + }, + async return() { + await rreader.cancel(); + return { done: true, value: undefined }; + } + }; + } + }, + data: resultPromise + }; +} + +/** + * Creates a ReadableStream that processes a streaming response from a streaming + * callable function that returns data in server-sent event format. + * + * @param reader The underlying reader providing raw response data + * @param resultResolver Callback to resolve the final result when received + * @param resultRejecter Callback to reject with an error if encountered + * @param signal Optional AbortSignal to cancel the stream processing + * @returns A ReadableStream that emits decoded messages from the response + * + * The returned ReadableStream: + * 1. Emits individual messages when "message" data is received + * 2. Resolves with the final result when a "result" message is received + * 3. Rejects with an error if an "error" message is received + */ +function createResponseStream( + reader: ReadableStreamDefaultReader, + resultResolver: (value: unknown) => void, + resultRejecter: (reason: unknown) => void, + signal?: AbortSignal +): ReadableStream { + const processLine = ( + line: string, + controller: ReadableStreamDefaultController + ): void => { + const match = line.match(responseLineRE); + // ignore all other lines (newline, comments, etc.) + if (!match) { + return; + } + const data = match[1]; + try { + const jsonData = JSON.parse(data); + if ('result' in jsonData) { + resultResolver(decode(jsonData.result)); + return; + } + if ('message' in jsonData) { + controller.enqueue(decode(jsonData.message)); + return; + } + if ('error' in jsonData) { + const error = _errorForResponse(0, jsonData); + controller.error(error); + resultRejecter(error); + return; + } + } catch (error) { + if (error instanceof FunctionsError) { + controller.error(error); + resultRejecter(error); + return; + } + // ignore other parsing errors + } + }; + + const decoder = new TextDecoder(); + return new ReadableStream({ + start(controller) { + let currentText = ''; + return pump(); + async function pump(): Promise { + if (signal?.aborted) { + const error = new FunctionsError( + 'cancelled', + 'Request was cancelled' + ); + controller.error(error); + resultRejecter(error); + return Promise.resolve(); + } + try { + const { value, done } = await reader.read(); + if (done) { + if (currentText.trim()) { + processLine(currentText.trim(), controller); + } + controller.close(); + return; + } + if (signal?.aborted) { + const error = new FunctionsError( + 'cancelled', + 'Request was cancelled' + ); + controller.error(error); + resultRejecter(error); + await reader.cancel(); + return; + } + currentText += decoder.decode(value, { stream: true }); + const lines = currentText.split('\n'); + currentText = lines.pop() || ''; + for (const line of lines) { + if (line.trim()) { + processLine(line.trim(), controller); + } + } + return pump(); + } catch (error) { + const functionsError = + error instanceof FunctionsError + ? error + : _errorForResponse(0, null); + controller.error(functionsError); + resultRejecter(functionsError); + } + } + }, + cancel() { + return reader.cancel(); + } + }); +}