diff --git a/src/createApiCall.ts b/src/createApiCall.ts index acffb6d3a..23080c1f0 100644 --- a/src/createApiCall.ts +++ b/src/createApiCall.ts @@ -31,6 +31,7 @@ import {Descriptor} from './descriptor'; import {CallOptions, CallSettings} from './gax'; import {retryable} from './normalCalls/retries'; import {addTimeoutArg} from './normalCalls/timeout'; +import {StreamingApiCaller} from './streamingCalls/streamingApiCaller'; /** * Converts an rpc call into an API call governed by the settings. @@ -83,8 +84,16 @@ export function createApiCall( .then((func: GRPCCall) => { // Initially, the function is just what gRPC server stub contains. func = currentApiCaller.wrap(func); + + const streaming = (currentApiCaller as StreamingApiCaller).descriptor + ?.streaming; const retry = thisSettings.retry; - if (retry && retry.retryCodes && retry.retryCodes.length > 0) { + if ( + !streaming && + retry && + retry.retryCodes && + retry.retryCodes.length > 0 + ) { retry.backoffSettings.initialRpcTimeoutMillis = retry.backoffSettings.initialRpcTimeoutMillis || thisSettings.timeout; diff --git a/test/unit/streaming.ts b/test/unit/streaming.ts index edf840ed5..38ac3d910 100644 --- a/test/unit/streaming.ts +++ b/test/unit/streaming.ts @@ -18,7 +18,7 @@ import * as assert from 'assert'; import * as sinon from 'sinon'; -import {describe, it} from 'mocha'; +import {afterEach, describe, it} from 'mocha'; import {PassThrough} from 'stream'; import {GaxCallStream, GRPCCall} from '../../src/apitypes'; @@ -43,6 +43,10 @@ function createApiCallStreaming( } describe('streaming', () => { + afterEach(() => { + sinon.restore(); + }); + it('handles server streaming', done => { const spy = sinon.spy((...args: Array<{}>) => { assert.strictEqual(args.length, 3); @@ -147,6 +151,41 @@ describe('streaming', () => { s.end(); }); + it('allows custome CallOptions.retry settings', done => { + sinon + .stub(streaming.StreamProxy.prototype, 'forwardEvents') + .callsFake(stream => { + assert(stream instanceof internal.Stream); + done(); + }); + const spy = sinon.spy((...args: Array<{}>) => { + assert.strictEqual(args.length, 3); + const s = new PassThrough({ + objectMode: true, + }); + return s; + }); + + const apiCall = createApiCallStreaming( + spy, + streaming.StreamType.SERVER_STREAMING + ); + + apiCall( + {}, + { + retry: gax.createRetryOptions([1], { + initialRetryDelayMillis: 100, + retryDelayMultiplier: 1.2, + maxRetryDelayMillis: 1000, + rpcTimeoutMultiplier: 1.5, + maxRpcTimeoutMillis: 3000, + totalTimeoutMillis: 4500, + }), + } + ); + }); + it('forwards metadata and status', done => { const responseMetadata = {metadata: true}; const status = {code: 0, metadata: responseMetadata};