Skip to content

Commit

Permalink
fix: cancel streaming grpc request when user ends stream (#507)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Jul 31, 2019
1 parent 4e02ff5 commit 2b4297c
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 3 deletions.
20 changes: 20 additions & 0 deletions src/table.ts
Expand Up @@ -32,6 +32,8 @@ import {ChunkTransformer} from './chunktransformer';
// See protos/google/rpc/code.proto
// (4=DEADLINE_EXCEEDED, 10=ABORTED, 14=UNAVAILABLE)
const RETRYABLE_STATUS_CODES = new Set([4, 10, 14]);
// (1=CANCELLED)
const IGNORED_STATUS_CODES = new Set([1]);

/**
* Create a Table object to interact with a Cloud Bigtable table.
Expand Down Expand Up @@ -269,6 +271,8 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
options = options || {};
const maxRetries = is.number(this.maxRetries) ? this.maxRetries : 3;

let activeRequestStream;

let rowKeys;
const ranges = options.ranges || [];
let filter;
Expand Down Expand Up @@ -321,6 +325,14 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
}

const userStream = through.obj();
const end = userStream.end.bind(userStream);
userStream.end = () => {
if (activeRequestStream) {
activeRequestStream.abort();
}
end();
};

let chunkTransformer;

const makeNewRequest = () => {
Expand Down Expand Up @@ -419,6 +431,8 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
retryOpts,
});

activeRequestStream = requestStream;

requestStream.on('request', () => numRequestsMade++);

const rowStream = pumpify.obj([
Expand All @@ -440,6 +454,12 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
]);

rowStream.on('error', error => {
if (IGNORED_STATUS_CODES.has(error.code)) {
// We ignore the `cancelled` "error", since we are the ones who cause
// it when the user calls `.abort()`.
userStream.end();
return;
}
rowStream.unpipe(userStream);
if (
numRequestsMade <= maxRetries &&
Expand Down
14 changes: 14 additions & 0 deletions system-test/bigtable.ts
Expand Up @@ -581,6 +581,20 @@ describe('Bigtable', () => {
});
});

it('should should cancel request if stream ended early', done => {
const rows: any = [];
const stream = TABLE.createReadStream()
.on('error', done)
.on('data', row => {
stream.end();
rows.push(row);
})
.on('end', () => {
assert.strictEqual(rows.length, 1);
done();
});
});

it('should fetch an individual row', async () => {
const row = TABLE.row('alincoln');
const [row_] = await row.get();
Expand Down
3 changes: 3 additions & 0 deletions system-test/read-rows-acceptance-tests.ts
Expand Up @@ -87,6 +87,9 @@ describe('Read Row Acceptance tests', function() {
objectMode: true,
});

/* tslint:disable-next-line */
(stream as any).abort = function() {};

setImmediate(function() {
test.chunks_base64
.map(chunk => {
Expand Down
8 changes: 5 additions & 3 deletions system-test/read-rows.ts
Expand Up @@ -123,9 +123,11 @@ describe('Bigtable/Table', () => {
}
requestedOptions.push(requestOptions);
rowKeysRead.push([]);
const emitter = through.obj();
dispatch(emitter, responses.shift());
return emitter;
const requestStream = through.obj();
/* tslint:disable-next-line */
(requestStream as any).abort = () => {};
dispatch(requestStream, responses.shift());
return requestStream;
});
});

Expand Down
38 changes: 38 additions & 0 deletions test/table.ts
Expand Up @@ -416,6 +416,23 @@ describe('Bigtable/Table', function() {
table.createReadStream();
});

it('should abort request on end', function(done) {
table.bigtable.request = function(config) {
const requestStream = new PassThrough({
objectMode: true,
});

/* tslint:disable-next-line */
(requestStream as any).abort = function() {
done();
};

return requestStream;
};

table.createReadStream().end();
});

describe('options', function() {
it('should accept gaxOptions', function(done) {
const gaxOptions = {};
Expand Down Expand Up @@ -779,6 +796,9 @@ describe('Bigtable/Table', function() {
objectMode: true,
});

/* tslint:disable-next-line */
(stream as any).abort = function() {};

setImmediate(function() {
stream.push(fakeChunks);
stream.push(null);
Expand Down Expand Up @@ -1001,6 +1021,9 @@ describe('Bigtable/Table', function() {
objectMode: true,
});

/* tslint:disable-next-line */
(stream as any).abort = function() {};

setImmediate(function() {
stream.emit('request');
emitters.shift()(stream);
Expand Down Expand Up @@ -1032,6 +1055,21 @@ describe('Bigtable/Table', function() {
});
});

it('should not retry CANCELLED errors', function(done) {
emitters = [
function(stream) {
const cancelledError: any = new Error('do not retry me!');
cancelledError.code = 1;
stream.emit('error', cancelledError);
stream.end();
},
];
callCreateReadStream(null, () => {
assert.strictEqual(reqOptsCalls.length, 1);
done();
});
});

it('should have a range which starts after the last read key', function(done) {
emitters = [
function(stream) {
Expand Down

0 comments on commit 2b4297c

Please sign in to comment.