Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make INTERNAL RST_STREAM errors retryable #947

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 18 additions & 2 deletions src/table.ts
Expand Up @@ -45,7 +45,9 @@ import {Duplex} from 'stream';

// See protos/google/rpc/code.proto
// (4=DEADLINE_EXCEEDED, 10=ABORTED, 14=UNAVAILABLE)
const RETRYABLE_STATUS_CODES = new Set([4, 10, 14]);
const UNAVAILABLE_STATUS_CODE = 14;
const INTERNAL_STATUS_CODE = 13;
const RETRYABLE_STATUS_CODES = new Set([4, 10, UNAVAILABLE_STATUS_CODE]);
// (1=CANCELLED)
const IGNORED_STATUS_CODES = new Set([1]);

Expand Down Expand Up @@ -929,9 +931,23 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
return;
}
rowStream.unpipe(userStream);

// Under normal conditions RST_STREAM errors will get wrapped as UNAVAILABLE.
// However if the RST_STREAM occurs further upstream, it can come down
// as an INTERNAL. This is a workaround to enable retrying those errors.
let effectiveCode = error.code;
if (effectiveCode === INTERNAL_STATUS_CODE) {
const errMsg = (error.message || '').toLowerCase();
if (
errMsg.indexOf('rst stream') > -1 ||
errMsg.indexOf('rst_stream') > -1
) {
effectiveCode = UNAVAILABLE_STATUS_CODE;
}
}
if (
numRequestsMade <= maxRetries &&
RETRYABLE_STATUS_CODES.has(error.code)
RETRYABLE_STATUS_CODES.has(effectiveCode)
) {
makeNewRequest();
} else {
Expand Down
21 changes: 21 additions & 0 deletions test/table.ts
Expand Up @@ -1217,6 +1217,27 @@ describe('Bigtable/Table', () => {
});
});

it('should retry the stream on internal rst_stream errors', done => {
emitters = [
((stream: Writable) => {
const error = new Error(
'Received RST_STREAM with code 2 (Internal server error)'
) as ServiceError;
error.code = 13; // INTERNAL
stream.emit('error', error);
stream.end();
}) as {} as EventEmitter,
((stream: Writable) => {
stream.end();
}) as {} as EventEmitter,
];

callCreateReadStream(null, () => {
assert.strictEqual(reqOptsCalls.length, 2);
done();
});
});

it('should not retry CANCELLED errors', done => {
emitters = [
((stream: Writable) => {
Expand Down