Skip to content

Commit cddd1ef

Browse files
committed
feat: add gapic streaming calls
1 parent 6b81f5e commit cddd1ef

File tree

3 files changed

+258
-64
lines changed

3 files changed

+258
-64
lines changed

src/index.ts

+67-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ import {shouldRetryRequest} from './decorateStatus';
3333
import {google} from '../protos/protos';
3434
import {ServiceError} from 'google-gax';
3535
import * as v2 from './v2';
36-
import {PassThrough, Duplex} from 'stream';
36+
import {PassThrough, Duplex, Transform} from 'stream';
37+
// eslint-disable-next-line @typescript-eslint/no-var-requires
38+
const pumpify = require('pumpify');
3739

3840
// eslint-disable-next-line @typescript-eslint/no-var-requires
3941
const retryRequest = require('retry-request');
@@ -688,6 +690,64 @@ export class Bigtable {
688690
);
689691
}
690692

693+
/**
694+
* Get {@link Instance} objects for all of your Cloud Bigtable instances as
695+
* a readable object stream.
696+
*
697+
* @param {object} [gaxOptions] Request configuration options, outlined here:
698+
* https://googleapis.github.io/gax-nodejs/classes/CallSettings.html.
699+
* {@link Bigtable#getInstances} for a complete list of options.
700+
* @returns {stream}
701+
*
702+
* @example
703+
* const {Bigtable} = require('@google-cloud/bigtable');
704+
* const bigtable = new Bigtable();
705+
*
706+
* bigtable.getInstancesStream()
707+
* .on('error', console.error)
708+
* .on('data', function(instance) {
709+
* // instance is an Instance object.
710+
* })
711+
* .on('end', () => {
712+
* // All instances retrieved.
713+
* });
714+
*
715+
* //-
716+
* // If you anticipate many results, you can end a stream early to prevent
717+
* // unnecessary processing and API requests.
718+
* //-
719+
* bigtable.getInstancesStream()
720+
* .on('data', function(instance) {
721+
* this.end();
722+
* });
723+
*/
724+
getInstancesStream(gaxOptions?: CallOptions): NodeJS.ReadableStream {
725+
const reqOpts = {
726+
parent: this.projectName,
727+
};
728+
729+
// eslint-disable-next-line @typescript-eslint/no-this-alias
730+
const self = this;
731+
const transformToInstance = (
732+
chunk: IInstance,
733+
enc: string,
734+
callback: Function
735+
) => {
736+
const instance = self.instance(chunk.name!.split('/').pop()!);
737+
instance.metadata = chunk;
738+
callback(null, instance);
739+
};
740+
return pumpify.obj([
741+
this.request({
742+
client: 'BigtableInstanceAdminClient',
743+
method: 'listInstancesStream',
744+
reqOpts,
745+
gaxOpts: gaxOptions,
746+
}),
747+
new Transform({objectMode: true, transform: transformToInstance}),
748+
]);
749+
}
750+
691751
/**
692752
* Get a reference to a Cloud Bigtable instance.
693753
*
@@ -750,7 +810,12 @@ export class Bigtable {
750810
});
751811
};
752812

753-
const gapicStreamingMethods = {listTablesStream: true};
813+
const gapicStreamingMethods = {
814+
listTablesStream: true,
815+
listInstancesStream: true,
816+
listAppProfilesStream: true,
817+
listClustersStream: true,
818+
};
754819

755820
if (isStreamMode) {
756821
stream = streamEvents(new PassThrough({objectMode: true}));

src/instance.ts

+118
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,65 @@ Please use the format 'my-instance' or '${bigtable.projectName}/instances/my-ins
686686
);
687687
}
688688

689+
/**
690+
* Get {@link AppProfile} objects for all the App Profiles in your
691+
* Cloud Bigtable instance as a readable object stream.
692+
*
693+
* @param {object} [gaxOptions] Request configuration options, outlined here:
694+
* https://googleapis.github.io/gax-nodejs/CallSettings.html.
695+
* {@link Instance#getAppProfiles} for a complete list of options.
696+
* @returns {stream}
697+
*
698+
* @example
699+
* const {Bigtable} = require('@google-cloud/bigtable');
700+
* const bigtable = new Bigtable();
701+
* const instance = bigtable.instance('my-instance');
702+
*
703+
* instance.getAppProfilesStream()
704+
* .on('error', console.error)
705+
* .on('data', function(appProfile) {
706+
* // appProfile is a AppProfile object.
707+
* })
708+
* .on('end', () => {
709+
* // All appProfiles retrieved.
710+
* });
711+
*
712+
* //-
713+
* // If you anticipate many results, you can end a stream early to prevent
714+
* // unnecessary processing and API requests.
715+
* //-
716+
* instance.getAppProfilesStream()
717+
* .on('data', function(appProfile) {
718+
* this.end();
719+
* });
720+
*/
721+
getAppProfilesStream(gaxOptions?: CallOptions): NodeJS.ReadableStream {
722+
const reqOpts = {
723+
parent: this.name,
724+
};
725+
726+
// eslint-disable-next-line @typescript-eslint/no-this-alias
727+
const self = this;
728+
const transformToAppProfile = (
729+
chunk: google.bigtable.admin.v2.IAppProfile,
730+
enc: string,
731+
callback: Function
732+
) => {
733+
const appProfile = self.appProfile(chunk.name!.split('/').pop()!);
734+
appProfile.metadata = chunk;
735+
callback(null, appProfile);
736+
};
737+
return pumpify.obj([
738+
this.bigtable.request({
739+
client: 'BigtableInstanceAdminClient',
740+
method: 'listAppProfilesStream',
741+
reqOpts,
742+
gaxOpts: gaxOptions,
743+
}),
744+
new Transform({objectMode: true, transform: transformToAppProfile}),
745+
]);
746+
}
747+
689748
getClusters(options?: CallOptions): Promise<GetClustersResponse>;
690749
getClusters(options: CallOptions, callback: GetClustersCallback): void;
691750
getClusters(callback: GetClustersCallback): void;
@@ -738,6 +797,65 @@ Please use the format 'my-instance' or '${bigtable.projectName}/instances/my-ins
738797
);
739798
}
740799

800+
/**
801+
* Get {@link Cluster} objects for all the Clusters in your
802+
* Cloud Bigtable instance as a readable object stream.
803+
*
804+
* @param {object} [gaxOptions] Request configuration options, outlined here:
805+
* https://googleapis.github.io/gax-nodejs/CallSettings.html.
806+
* {@link Instance#getClusters} for a complete list of options.
807+
* @returns {stream}
808+
*
809+
* @example
810+
* const {Bigtable} = require('@google-cloud/bigtable');
811+
* const bigtable = new Bigtable();
812+
* const instance = bigtable.instance('my-instance');
813+
*
814+
* instance.getClustersStream()
815+
* .on('error', console.error)
816+
* .on('data', function(cluster) {
817+
* // cluster is a AppProfile object.
818+
* })
819+
* .on('end', () => {
820+
* // All clusters retrieved.
821+
* });
822+
*
823+
* //-
824+
* // If you anticipate many results, you can end a stream early to prevent
825+
* // unnecessary processing and API requests.
826+
* //-
827+
* instance.getClustersStream()
828+
* .on('data', function(cluster) {
829+
* this.end();
830+
* });
831+
*/
832+
getClustersStream(gaxOptions?: CallOptions): NodeJS.ReadableStream {
833+
const reqOpts = {
834+
parent: this.name,
835+
};
836+
837+
// eslint-disable-next-line @typescript-eslint/no-this-alias
838+
const self = this;
839+
const transformToCluster = (
840+
chunk: google.bigtable.admin.v2.ICluster,
841+
enc: string,
842+
callback: Function
843+
) => {
844+
const cluster = self.cluster(chunk.name!.split('/').pop()!);
845+
cluster.metadata = chunk;
846+
callback(null, cluster);
847+
};
848+
return pumpify.obj([
849+
this.bigtable.request({
850+
client: 'BigtableInstanceAdminClient',
851+
method: 'listClustersStream',
852+
reqOpts,
853+
gaxOpts: gaxOptions,
854+
}),
855+
new Transform({objectMode: true, transform: transformToCluster}),
856+
]);
857+
}
858+
741859
getIamPolicy(options?: GetIamPolicyOptions): Promise<[Policy]>;
742860
getIamPolicy(
743861
options: GetIamPolicyOptions,

test/index.ts

+73-62
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,13 @@ describe('Bigtable', () => {
635635
gaxOpts: {},
636636
};
637637

638+
const gapicStreamingMethods = [
639+
'listTablesStream',
640+
'listInstancesStream',
641+
'listAppProfilesStream',
642+
'listClustersStream',
643+
];
644+
638645
beforeEach(() => {
639646
bigtable.getProjectId_ = (callback: Function) => {
640647
callback(null, PROJECT_ID);
@@ -874,76 +881,80 @@ describe('Bigtable', () => {
874881
});
875882
});
876883

877-
describe('makeGapicStreamRequest', () => {
878-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
879-
let GAX_STREAM: any;
880-
const config = {
881-
client: 'client',
882-
method: 'listTablesStream',
883-
reqOpts: {
884-
a: 'b',
885-
c: 'd',
886-
},
887-
gaxOpts: {},
888-
};
889-
890-
beforeEach(() => {
891-
GAX_STREAM = new PassThrough();
892-
bigtable.api[config.client][config.method] = {
893-
bind() {
894-
return () => {
895-
return GAX_STREAM;
884+
gapicStreamingMethods.forEach(method => {
885+
describe('makeGapicStreamRequest', () => {
886+
describe(method, () => {
887+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
888+
let GAX_STREAM: any;
889+
const config = {
890+
client: 'client',
891+
method: method,
892+
reqOpts: {
893+
a: 'b',
894+
c: 'd',
895+
},
896+
gaxOpts: {},
897+
};
898+
899+
beforeEach(() => {
900+
GAX_STREAM = new PassThrough();
901+
bigtable.api[config.client][config.method] = {
902+
bind() {
903+
return () => {
904+
return GAX_STREAM;
905+
};
906+
},
896907
};
897-
},
898-
};
899-
});
908+
});
900909

901-
it('should expose an abort function', done => {
902-
GAX_STREAM.cancel = done;
910+
it('should expose an abort function', done => {
911+
GAX_STREAM.cancel = done;
903912

904-
const requestStream = bigtable.request(config);
905-
requestStream.emit('reading');
906-
requestStream.abort();
907-
});
913+
const requestStream = bigtable.request(config);
914+
requestStream.emit('reading');
915+
requestStream.abort();
916+
});
908917

909-
it('should prepare the request once reading', done => {
910-
bigtable.api[config.client][config.method] = {
911-
bind(gaxClient: {}, reqOpts: {}, gaxOpts: {}) {
912-
assert.strictEqual(gaxClient, bigtable.api[config.client]);
913-
assert.deepStrictEqual(reqOpts, config.reqOpts);
914-
assert.strictEqual(gaxOpts, config.gaxOpts);
915-
setImmediate(done);
916-
return () => {
917-
return GAX_STREAM;
918+
it('should prepare the request once reading', done => {
919+
bigtable.api[config.client][config.method] = {
920+
bind(gaxClient: {}, reqOpts: {}, gaxOpts: {}) {
921+
assert.strictEqual(gaxClient, bigtable.api[config.client]);
922+
assert.deepStrictEqual(reqOpts, config.reqOpts);
923+
assert.strictEqual(gaxOpts, config.gaxOpts);
924+
setImmediate(done);
925+
return () => {
926+
return GAX_STREAM;
927+
};
928+
},
918929
};
919-
},
920-
};
921930

922-
const requestStream = bigtable.request(config);
923-
requestStream.emit('reading');
924-
});
931+
const requestStream = bigtable.request(config);
932+
requestStream.emit('reading');
933+
});
925934

926-
it('should destroy the stream with prepare error', done => {
927-
const error = new Error('Error.');
928-
bigtable.getProjectId_ = (callback: Function) => {
929-
callback(error);
930-
};
931-
const requestStream = bigtable.request(config);
932-
requestStream.emit('reading');
933-
requestStream.on('error', (err: Error) => {
934-
assert.strictEqual(err, error);
935-
done();
936-
});
937-
});
935+
it('should destroy the stream with prepare error', done => {
936+
const error = new Error('Error.');
937+
bigtable.getProjectId_ = (callback: Function) => {
938+
callback(error);
939+
};
940+
const requestStream = bigtable.request(config);
941+
requestStream.emit('reading');
942+
requestStream.on('error', (err: Error) => {
943+
assert.strictEqual(err, error);
944+
done();
945+
});
946+
});
938947

939-
it('should destroy the stream with GAX error', done => {
940-
const error = new Error('Error.');
941-
const requestStream = bigtable.request(config);
942-
requestStream.emit('reading');
943-
GAX_STREAM.emit('error', error);
944-
requestStream.on('error', (err: Error) => {
945-
assert.strictEqual(err, error);
946-
done();
948+
it('should destroy the stream with GAX error', done => {
949+
const error = new Error('Error.');
950+
const requestStream = bigtable.request(config);
951+
requestStream.emit('reading');
952+
GAX_STREAM.emit('error', error);
953+
requestStream.on('error', (err: Error) => {
954+
assert.strictEqual(err, error);
955+
done();
956+
});
957+
});
947958
});
948959
});
949960
});

0 commit comments

Comments
 (0)