@@ -17,6 +17,7 @@ import {promisifyAll} from '@google-cloud/promisify';
17
17
import arrify = require( 'arrify' ) ;
18
18
import { ServiceError } from 'google-gax' ;
19
19
import { decorateStatus } from './decorateStatus' ;
20
+ import { PassThrough } from 'stream' ;
20
21
21
22
// eslint-disable-next-line @typescript-eslint/no-var-requires
22
23
const concat = require ( 'concat-stream' ) ;
@@ -683,16 +684,18 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
683
684
rowsLimit = options . limit ;
684
685
}
685
686
686
- const userStream = through . obj ( ) ;
687
+ const userStream = new PassThrough ( { objectMode : true } ) ;
687
688
const end = userStream . end . bind ( userStream ) ;
688
689
userStream . end = ( ) => {
690
+ rowStream ?. unpipe ( userStream ) ;
689
691
if ( activeRequestStream ) {
690
692
activeRequestStream . abort ( ) ;
691
693
}
692
694
end ( ) ;
693
695
} ;
694
696
695
697
let chunkTransformer : ChunkTransformer ;
698
+ let rowStream : Duplex ;
696
699
697
700
const makeNewRequest = ( ) => {
698
701
const lastRowKey = chunkTransformer ? chunkTransformer . lastRowKey : '' ;
@@ -808,7 +811,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
808
811
809
812
requestStream ! . on ( 'request' , ( ) => numRequestsMade ++ ) ;
810
813
811
- const rowStream : Duplex = pumpify . obj ( [
814
+ rowStream = pumpify . obj ( [
812
815
requestStream ,
813
816
chunkTransformer ,
814
817
through . obj ( ( rowData , enc , next ) => {
0 commit comments