Skip to content

Commit b5a7eb1

Browse files
author
Lalji Kanjareeya
committedJul 21, 2020
fix: system-test issue
1 parent 6926e6b commit b5a7eb1

File tree

3 files changed

+33
-45
lines changed

3 files changed

+33
-45
lines changed
 

‎src/table.ts

+9-5
Original file line numberDiff line numberDiff line change
@@ -809,9 +809,13 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
809809

810810
requestStream!.on('request', () => numRequestsMade++);
811811

812-
const transform = new Transform({
812+
const toRowStream = new Transform({
813813
transform: (rowData, _, next) => {
814-
if (chunkTransformer._destroyed || !userStream.writable) {
814+
if (
815+
chunkTransformer._destroyed ||
816+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
817+
(userStream as any)._writableState.ended
818+
) {
815819
return next();
816820
}
817821
numRequestsMade = 0;
@@ -823,7 +827,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
823827
objectMode: true,
824828
});
825829

826-
rowStream = pumpify.obj([requestStream, chunkTransformer, transform]);
830+
rowStream = pumpify.obj([requestStream, chunkTransformer, toRowStream]);
827831

828832
rowStream.on('error', (error: ServiceError) => {
829833
if (IGNORED_STATUS_CODES.has(error.code)) {
@@ -1557,7 +1561,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
15571561
appProfileId: this.bigtable.appProfileId,
15581562
};
15591563

1560-
const transform = new Transform({
1564+
const rowKeysStream = new Transform({
15611565
transform(key, enc, next) {
15621566
next(null, {
15631567
key: key.rowKey,
@@ -1574,7 +1578,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
15741578
reqOpts,
15751579
gaxOpts: gaxOptions,
15761580
}),
1577-
transform,
1581+
rowKeysStream,
15781582
]);
15791583
}
15801584

‎system-test/read-rows-acceptance-test.json

-16
Original file line numberDiff line numberDiff line change
@@ -378,14 +378,6 @@
378378
"name": "invalid - duplicate row key",
379379
"chunks_base64": ["CgRSS18xEgMKAUEaAwoBQyBkMgl2YWx1ZS1WQUxIAQ==", "CgRSS18xEgMKAUIaAwoBRCBkMgl2YWx1ZS1WQUxIAQ=="],
380380
"results": [{
381-
"fm": "A",
382-
"rk": "RK_1",
383-
"qual": "C",
384-
"label": "",
385-
"error": false,
386-
"value": "value-VAL",
387-
"ts": 100
388-
}, {
389381
"fm": "",
390382
"rk": "",
391383
"qual": "",
@@ -399,14 +391,6 @@
399391
"name": "invalid - new row missing row key",
400392
"chunks_base64": ["CgRSS18xEgMKAUEaAwoBQyBkMgl2YWx1ZS1WQUxIAQ==", "IGQyCXZhbHVlLVZBTEgB"],
401393
"results": [{
402-
"fm": "A",
403-
"rk": "RK_1",
404-
"qual": "C",
405-
"label": "",
406-
"error": false,
407-
"value": "value-VAL",
408-
"ts": 100
409-
}, {
410394
"fm": "",
411395
"rk": "",
412396
"qual": "",

‎system-test/read-rows-acceptance-tests.ts

+24-24
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ import {protobuf} from 'google-gax';
2424
import * as fs from 'fs';
2525
import * as path from 'path';
2626
import {Instance} from '../src/instance';
27-
import {Bigtable} from '../src';
28-
// eslint-disable-next-line @typescript-eslint/no-var-requires
29-
const streamEvents = require('stream-events');
27+
import {Bigtable, AbortableDuplex} from '../src';
3028

3129
const protosJson = path.resolve(__dirname, '../protos/protos.json');
3230
const root = protobuf.Root.fromJSON(
@@ -71,29 +69,31 @@ describe('Read Row Acceptance tests', () => {
7169
table.bigtable = {} as Bigtable;
7270
// eslint-disable-next-line @typescript-eslint/no-explicit-any
7371
(table.bigtable.request as any) = () => {
74-
const stream = streamEvents(new PassThrough({objectMode: true}));
75-
stream.abort = () => {};
72+
const stream = new PassThrough({
73+
objectMode: true,
74+
});
7675

77-
const mocked_stream = function (size: number) {
78-
test.chunks_base64.forEach((value, index, array) => {
79-
const chunk = value;
80-
const cellChunk = CellChunk.decode(
81-
Buffer.from(chunk as string, 'base64')
82-
);
83-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
84-
let readRowsResponse: any = {chunks: [cellChunk]};
85-
readRowsResponse = ReadRowsResponse.create(readRowsResponse);
86-
readRowsResponse = ReadRowsResponse.toObject(readRowsResponse, {
87-
defaults: true,
88-
longs: String,
89-
oneofs: true,
90-
});
91-
stream.emit('data', readRowsResponse);
92-
});
93-
stream.emit('end');
94-
};
76+
((stream as {}) as AbortableDuplex).abort = () => {};
9577

96-
stream.once('reading', mocked_stream);
78+
setImmediate(() => {
79+
test.chunks_base64
80+
.map(chunk => {
81+
const cellChunk = CellChunk.decode(
82+
Buffer.from(chunk as string, 'base64')
83+
); //.decode64(chunk);
84+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
85+
let readRowsResponse: any = {chunks: [cellChunk]};
86+
readRowsResponse = ReadRowsResponse.create(readRowsResponse);
87+
readRowsResponse = ReadRowsResponse.toObject(readRowsResponse, {
88+
defaults: true,
89+
longs: String,
90+
oneofs: true,
91+
});
92+
return readRowsResponse;
93+
})
94+
.forEach(readRowsResponse => stream.push(readRowsResponse));
95+
stream.push(null);
96+
});
9797

9898
return stream;
9999
};

0 commit comments

Comments
 (0)