Skip to content

Commit

Permalink
fix(deps): update dependency @google-cloud/common to v3 and handle st…
Browse files Browse the repository at this point in the history
…ream ending (#704)

* fix: handle unpiping at userStream end
* fix: check for defined rowStream and use builtins

Co-authored-by: Benjamin E. Coe <bencoe@google.com>
  • Loading branch information
crwilcox and bcoe committed Apr 15, 2020
1 parent a95ab4a commit d8ada04
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -43,7 +43,7 @@
"test": "c8 mocha build/test"
},
"dependencies": {
"@google-cloud/common": "^2.2.2",
"@google-cloud/common": "^3.0.0",
"@google-cloud/paginator": "^3.0.0",
"@google-cloud/projectify": "^2.0.0",
"@google-cloud/promisify": "^2.0.0",
Expand Down
7 changes: 5 additions & 2 deletions src/table.ts
Expand Up @@ -17,6 +17,7 @@ import {promisifyAll} from '@google-cloud/promisify';
import arrify = require('arrify');
import {ServiceError} from 'google-gax';
import {decorateStatus} from './decorateStatus';
import {PassThrough} from 'stream';

// eslint-disable-next-line @typescript-eslint/no-var-requires
const concat = require('concat-stream');
Expand Down Expand Up @@ -683,16 +684,18 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
rowsLimit = options.limit;
}

const userStream = through.obj();
const userStream = new PassThrough({objectMode: true});
const end = userStream.end.bind(userStream);
userStream.end = () => {
rowStream?.unpipe(userStream);
if (activeRequestStream) {
activeRequestStream.abort();
}
end();
};

let chunkTransformer: ChunkTransformer;
let rowStream: Duplex;

const makeNewRequest = () => {
const lastRowKey = chunkTransformer ? chunkTransformer.lastRowKey : '';
Expand Down Expand Up @@ -808,7 +811,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);

requestStream!.on('request', () => numRequestsMade++);

const rowStream: Duplex = pumpify.obj([
rowStream = pumpify.obj([
requestStream,
chunkTransformer,
through.obj((rowData, enc, next) => {
Expand Down

0 comments on commit d8ada04

Please sign in to comment.