Skip to content

Commit

Permalink
fix: allow callOptions.retry settings with grpc streaming calls (#901)
Browse files Browse the repository at this point in the history
Currently if user passes custom `CallOptions.retry` settings to a function that employs grpc streaming call, the API will throw
```ts
TypeError: stream.on is not a function
```

from this line 

https://github.com/googleapis/gax-nodejs/blob/73b9d0d06fc24fdd11e5a408a396c082df57d177/src/streamingCalls/streaming.ts#L119


due to GRPCCall func being wrapped in [`retryable`](https://github.com/googleapis/gax-nodejs/blob/master/src/createApiCall.ts#L88)
  • Loading branch information
AVaksman committed Sep 24, 2020
1 parent 963c714 commit 533de29
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
11 changes: 10 additions & 1 deletion src/createApiCall.ts
Expand Up @@ -31,6 +31,7 @@ import {Descriptor} from './descriptor';
import {CallOptions, CallSettings} from './gax';
import {retryable} from './normalCalls/retries';
import {addTimeoutArg} from './normalCalls/timeout';
import {StreamingApiCaller} from './streamingCalls/streamingApiCaller';

/**
* Converts an rpc call into an API call governed by the settings.
Expand Down Expand Up @@ -83,8 +84,16 @@ export function createApiCall(
.then((func: GRPCCall) => {
// Initially, the function is just what gRPC server stub contains.
func = currentApiCaller.wrap(func);

const streaming = (currentApiCaller as StreamingApiCaller).descriptor
?.streaming;
const retry = thisSettings.retry;
if (retry && retry.retryCodes && retry.retryCodes.length > 0) {
if (
!streaming &&
retry &&
retry.retryCodes &&
retry.retryCodes.length > 0
) {
retry.backoffSettings.initialRpcTimeoutMillis =
retry.backoffSettings.initialRpcTimeoutMillis ||
thisSettings.timeout;
Expand Down
41 changes: 40 additions & 1 deletion test/unit/streaming.ts
Expand Up @@ -18,7 +18,7 @@

import * as assert from 'assert';
import * as sinon from 'sinon';
import {describe, it} from 'mocha';
import {afterEach, describe, it} from 'mocha';
import {PassThrough} from 'stream';

import {GaxCallStream, GRPCCall} from '../../src/apitypes';
Expand All @@ -43,6 +43,10 @@ function createApiCallStreaming(
}

describe('streaming', () => {
afterEach(() => {
sinon.restore();
});

it('handles server streaming', done => {
const spy = sinon.spy((...args: Array<{}>) => {
assert.strictEqual(args.length, 3);
Expand Down Expand Up @@ -147,6 +151,41 @@ describe('streaming', () => {
s.end();
});

it('allows custome CallOptions.retry settings', done => {
sinon
.stub(streaming.StreamProxy.prototype, 'forwardEvents')
.callsFake(stream => {
assert(stream instanceof internal.Stream);
done();
});
const spy = sinon.spy((...args: Array<{}>) => {
assert.strictEqual(args.length, 3);
const s = new PassThrough({
objectMode: true,
});
return s;
});

const apiCall = createApiCallStreaming(
spy,
streaming.StreamType.SERVER_STREAMING
);

apiCall(
{},
{
retry: gax.createRetryOptions([1], {
initialRetryDelayMillis: 100,
retryDelayMultiplier: 1.2,
maxRetryDelayMillis: 1000,
rpcTimeoutMultiplier: 1.5,
maxRpcTimeoutMillis: 3000,
totalTimeoutMillis: 4500,
}),
}
);
});

it('forwards metadata and status', done => {
const responseMetadata = {metadata: true};
const status = {code: 0, metadata: responseMetadata};
Expand Down

0 comments on commit 533de29

Please sign in to comment.