From 218a4c65afcc55158aac45b98a4ccb28b88c00a1 Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Thu, 2 Jan 2020 13:17:50 -0800 Subject: [PATCH] feat: use GAX retry config for streams (#847) --- README.md | 4 +- dev/src/backoff.ts | 4 +- dev/src/index.ts | 169 ++++++++++++++++++--------------------- dev/src/reference.ts | 14 +--- dev/src/transaction.ts | 15 +--- dev/src/types.ts | 4 - dev/src/util.ts | 25 ++++++ dev/src/watch.ts | 156 +++++------------------------------- dev/src/write-batch.ts | 10 +-- dev/test/document.ts | 30 +------ dev/test/index.ts | 105 +++++++++++++++--------- dev/test/query.ts | 7 +- dev/test/transaction.ts | 19 +---- dev/test/util/helpers.ts | 4 +- dev/test/watch.ts | 138 +++++++++++++++----------------- package.json | 2 +- 16 files changed, 275 insertions(+), 431 deletions(-) diff --git a/README.md b/README.md index d1cd97732..536072cc8 100644 --- a/README.md +++ b/README.md @@ -15,9 +15,7 @@ This is the Node.js Server SDK for [Google Cloud Firestore](https://firebase.goo This Cloud Firestore Server SDK uses Google’s Cloud Identity and Access Management for authentication and should only be used in trusted environments. Your Cloud Identity credentials allow you bypass all access restrictions and provide read and write access to all data in your Cloud Firestore project. -The Cloud Firestore Server SDKs are designed to manage the full set of data in your Cloud Firestore project and work best with reliable network connectivity. Data operations performed via these SDKs directly access the Cloud Firestore backend and all document reads and writes are optimized for high throughput. - -Applications that use Google's Server SDKs should not be used in end-user environments, such as on phones or on publicly hosted websites. If you are developing a Web or Node.js application that accesses Cloud Firestore on behalf of end users, use the firebase Client SDK. +Applications that use Google's Server SDKs should not be used in end-user environments, such as on phones or on publicly hosted websites. If you are developing a Web or Node.js application that accesses Cloud Firestore on behalf of end users, use the Firebase Client SDK. **Note:** This Cloud Firestore Server SDK does not support Firestore databases created in [Datastore mode](https://cloud.google.com/datastore/docs/firestore-or-datastore#in_datastore_mode). To access these databases, use the [Datastore SDK](https://www.npmjs.com/package/@google-cloud/datastore). diff --git a/dev/src/backoff.ts b/dev/src/backoff.ts index 6529ecebd..f0bc32017 100644 --- a/dev/src/backoff.ts +++ b/dev/src/backoff.ts @@ -80,7 +80,7 @@ export function setTimeoutHandler( * * @private */ -export interface ExponentialBackoffOptions { +export interface ExponentialBackoffSetting { /** Optional override for the initial retry delay. */ initialDelayMs?: number; /** Optional override for the exponential backoff factor. */ @@ -162,7 +162,7 @@ export class ExponentialBackoff { */ private awaitingBackoffCompletion = false; - constructor(options: ExponentialBackoffOptions = {}) { + constructor(options: ExponentialBackoffSetting = {}) { this.initialDelayMs = options.initialDelayMs !== undefined ? options.initialDelayMs diff --git a/dev/src/index.ts b/dev/src/index.ts index 5211ae25f..c765b30b5 100644 --- a/dev/src/index.ts +++ b/dev/src/index.ts @@ -14,12 +14,13 @@ * limitations under the License. */ -import {CallOptions} from 'google-gax'; +import {CallOptions, GoogleError} from 'google-gax'; import {Duplex, PassThrough} from 'stream'; import * as through2 from 'through2'; import {URL} from 'url'; import {google} from '../protos/firestore_v1_proto_api'; +import {ExponentialBackoff, ExponentialBackoffSetting} from './backoff'; import {fieldsFromJson, timestampFromJson} from './convert'; import { DocumentSnapshot, @@ -40,14 +41,8 @@ import {DocumentReference} from './reference'; import {Serializer} from './serializer'; import {Timestamp} from './timestamp'; import {parseGetAllArguments, Transaction} from './transaction'; -import { - ApiMapValue, - GapicClient, - GrpcError, - ReadOptions, - Settings, -} from './types'; -import {Deferred, requestTag} from './util'; +import {ApiMapValue, GapicClient, ReadOptions, Settings} from './types'; +import {Deferred, isPermanentRpcError, requestTag} from './util'; import { validateBoolean, validateFunction, @@ -59,6 +54,9 @@ import { } from './validate'; import {WriteBatch} from './write-batch'; +import {interfaces} from './v1/firestore_client_config.json'; +const serviceConfig = interfaces['google.firestore.v1.Firestore']; + import api = google.firestore.v1; export { @@ -143,11 +141,6 @@ const DEFAULT_MAX_IDLE_CHANNELS = 1; */ const MAX_CONCURRENT_REQUESTS_PER_CLIENT = 100; -/*! - * GRPC Error code for 'UNAVAILABLE'. - */ -const GRPC_UNAVAILABLE = 14; - /** * Document data (e.g. for use with * [set()]{@link DocumentReference#set}) consisting of fields mapped @@ -265,6 +258,12 @@ export class Firestore { */ private _settings: Settings = {}; + /** + * Settings for the exponential backoff used by the streaming endpoints. + * @private + */ + private _backoffSettings: ExponentialBackoffSetting; + /** * Whether the initialization settings can still be changed by invoking * `settings()`. @@ -368,6 +367,13 @@ export class Firestore { this.validateAndApplySettings({...settings, ...libraryHeader}); } + const retryConfig = serviceConfig.retry_params.default; + this._backoffSettings = { + initialDelayMs: retryConfig.initial_retry_delay_millis, + maxDelayMs: retryConfig.max_retry_delay_millis, + backoffFactor: retryConfig.retry_delay_multiplier, + }; + // GCF currently tears down idle connections after two minutes. Requests // that are issued after this period may fail. On GCF, we therefore issue // these requests as part of a transaction so that we can safely retry until @@ -1107,62 +1113,53 @@ export class Firestore { * for further attempts. * * @private - * @param attemptsRemaining The number of available attempts. + * @param methodName Name of the Veneer API endpoint that takes a request + * and GAX options. * @param requestTag A unique client-assigned identifier for this request. * @param func Method returning a Promise than can be retried. - * @param delayMs How long to wait before issuing a this retry. Defaults to - * zero. * @returns - A Promise with the function's result if successful within * `attemptsRemaining`. Otherwise, returns the last rejected Promise. */ - private _retry( - attemptsRemaining: number, + private async _retry( + methodName: string, requestTag: string, - func: () => Promise, - delayMs = 0 + func: () => Promise ): Promise { - const self = this; + const backoff = new ExponentialBackoff(); - const currentDelay = delayMs; - const nextDelay = delayMs || 100; + let lastError: Error | undefined = undefined; - --attemptsRemaining; - - return new Promise(resolve => { - setTimeout(resolve, currentDelay); - }) - .then(func) - .then(result => { - self._lastSuccessfulRequest = new Date().getTime(); - return result; - }) - .catch(err => { - if (err.code !== undefined && err.code !== GRPC_UNAVAILABLE) { - logger( - 'Firestore._retry', - requestTag, - 'Request failed with unrecoverable error:', - err - ); - return Promise.reject(err); - } - if (attemptsRemaining === 0) { - logger( - 'Firestore._retry', - requestTag, - 'Request failed with error:', - err - ); - return Promise.reject(err); - } + for (let attempt = 0; attempt < MAX_REQUEST_RETRIES; ++attempt) { + if (lastError) { logger( 'Firestore._retry', requestTag, 'Retrying request that failed with error:', - err + lastError ); - return self._retry(attemptsRemaining, requestTag, func, nextDelay); - }); + } + + try { + await backoff.backoffAndWait(); + const result = await func(); + this._lastSuccessfulRequest = new Date().getTime(); + return result; + } catch (err) { + lastError = err; + + if (isPermanentRpcError(err, methodName, serviceConfig)) { + break; + } + } + } + + logger( + 'Firestore._retry', + requestTag, + 'Request failed with error:', + lastError + ); + return Promise.reject(lastError); } /** @@ -1269,51 +1266,37 @@ export class Firestore { * necessary within the request options. * * @private - * @param methodName Name of the veneer API endpoint that takes a request + * @param methodName Name of the Veneer API endpoint that takes a request * and GAX options. * @param request The Protobuf request to send. * @param requestTag A unique client-assigned identifier for this request. - * @param allowRetries Whether this is an idempotent request that can be - * retried. * @returns A Promise with the request result. */ - request( - methodName: string, - request: {}, - requestTag: string, - allowRetries: boolean - ): Promise { - const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1; + request(methodName: string, request: {}, requestTag: string): Promise { const callOptions = this.createCallOptions(); return this._clientPool.run(requestTag, gapicClient => { - return this._retry(attempts, requestTag, () => { - return new Promise((resolve, reject) => { - logger( - 'Firestore.request', - requestTag, - 'Sending request: %j', - request - ); - gapicClient[methodName]( - request, - callOptions, - (err: GrpcError, result: T) => { - if (err) { - logger('Firestore.request', requestTag, 'Received error:', err); - reject(err); - } else { - logger( - 'Firestore.request', - requestTag, - 'Received response: %j', - result - ); - resolve(result); - } + return new Promise((resolve, reject) => { + logger('Firestore.request', requestTag, 'Sending request: %j', request); + gapicClient[methodName]( + request, + callOptions, + (err: GoogleError, result: T) => { + if (err) { + logger('Firestore.request', requestTag, 'Received error:', err); + reject(err); + } else { + logger( + 'Firestore.request', + requestTag, + 'Received response: %j', + result + ); + this._lastSuccessfulRequest = new Date().getTime(); + resolve(result); } - ); - }); + } + ); }); }); } @@ -1349,7 +1332,7 @@ export class Firestore { // stream. const lifetime = new Deferred(); - this._retry(MAX_REQUEST_RETRIES, requestTag, async () => { + this._retry(methodName, requestTag, async () => { logger( 'Firestore.requestStream', requestTag, diff --git a/dev/src/reference.ts b/dev/src/reference.ts index ae98085e3..048622b0e 100644 --- a/dev/src/reference.ts +++ b/dev/src/reference.ts @@ -293,12 +293,7 @@ export class DocumentReference implements Serializable { pageSize: Math.pow(2, 16) - 1, }; return this._firestore - .request( - 'listCollectionIds', - request, - tag, - /* allowRetries= */ true - ) + .request('listCollectionIds', request, tag) .then(collectionIds => { const collections: CollectionReference[] = []; @@ -2065,12 +2060,7 @@ export class CollectionReference extends Query { }; return this.firestore - .request( - 'listDocuments', - request, - tag, - /*allowRetries=*/ true - ) + .request('listDocuments', request, tag) .then(documents => { // Note that the backend already orders these documents by name, // so we do not need to manually sort them. diff --git a/dev/src/transaction.ts b/dev/src/transaction.ts index 5d260a1c8..25b7cea24 100644 --- a/dev/src/transaction.ts +++ b/dev/src/transaction.ts @@ -50,11 +50,6 @@ import api = proto.google.firestore.v1; const READ_AFTER_WRITE_ERROR_MSG = 'Firestore transactions require all reads to be executed before all writes.'; -/*! - * Transactions can be retried if the initial stream opening errors out. - */ -const ALLOW_RETRIES = true; - /** * A reference to a transaction. * @@ -372,8 +367,7 @@ export class Transaction { .request( 'beginTransaction', request, - this._requestTag, - ALLOW_RETRIES + this._requestTag ) .then(resp => { this._transactionId = resp.transaction!; @@ -405,12 +399,7 @@ export class Transaction { transaction: this._transactionId, }; - return this._firestore.request( - 'rollback', - request, - this._requestTag, - /* allowRetries= */ false - ); + return this._firestore.request('rollback', request, this._requestTag); } /** diff --git a/dev/src/types.ts b/dev/src/types.ts index a70517a2d..cca4fdc0d 100644 --- a/dev/src/types.ts +++ b/dev/src/types.ts @@ -36,10 +36,6 @@ export type GapicClient = any; // tslint:disable-next-line:no-any export type RBTree = any; -export class GrpcError extends Error { - code?: number; -} - /** * Settings used to directly configure a `Firestore` instance. */ diff --git a/dev/src/util.ts b/dev/src/util.ts index b3915e97b..95252e5a4 100644 --- a/dev/src/util.ts +++ b/dev/src/util.ts @@ -14,6 +14,8 @@ * limitations under the License. */ +import {GoogleError, ServiceConfig, Status} from 'google-gax'; + /** * A Promise implementation that supports deferred resolution. * @private @@ -92,3 +94,26 @@ export function isEmpty(value: {}): boolean { export function isFunction(value: unknown): boolean { return typeof value === 'function'; } + +/** + * Determines whether the provided error is considered permanent for the given + * RPC. + * + * @private + */ +export function isPermanentRpcError( + err: GoogleError, + methodName: string, + config: ServiceConfig +): boolean { + if (err.code !== undefined) { + const serviceConfigName = methodName[0].toUpperCase() + methodName.slice(1); + const retryCodeNames = config.methods[serviceConfigName]!.retry_codes_name!; + const retryCodes = config.retry_codes![retryCodeNames].map( + errorName => Status[errorName as keyof typeof Status] + ); + return retryCodes.indexOf(err.code) === -1; + } else { + return false; + } +} diff --git a/dev/src/watch.ts b/dev/src/watch.ts index 9f4692852..6771399fa 100644 --- a/dev/src/watch.ts +++ b/dev/src/watch.ts @@ -16,6 +16,7 @@ import * as assert from 'assert'; import * as rbtree from 'functional-red-black-tree'; +import {GoogleError, Status} from 'google-gax'; import {describe, it} from 'mocha'; import {Duplex} from 'stream'; @@ -27,7 +28,7 @@ import {DocumentReference, Firestore, Query} from './index'; import {logger} from './logger'; import {QualifiedResourcePath} from './path'; import {Timestamp} from './timestamp'; -import {GrpcError, RBTree} from './types'; +import {RBTree} from './types'; import {requestTag} from './util'; import api = google.firestore.v1; @@ -54,123 +55,6 @@ const ChangeType: {[k: string]: DocumentChangeType} = { removed: 'removed', }; -/*! - * List of GRPC Error Codes. - * - * This corresponds to - * {@link https://github.com/grpc/grpc/blob/master/doc/statuscodes.md}. - */ -const GRPC_STATUS_CODE: {[k: string]: number} = { - // Not an error; returned on success. - OK: 0, - - // The operation was cancelled (typically by the caller). - CANCELLED: 1, - - // Unknown error. An example of where this error may be returned is if a - // Status value received from another address space belongs to an error-space - // that is not known in this address space. Also errors raised by APIs that - // do not return enough error information may be converted to this error. - UNKNOWN: 2, - - // Client specified an invalid argument. Note that this differs from - // FAILED_PRECONDITION. INVALID_ARGUMENT indicates arguments that are - // problematic regardless of the state of the system (e.g., a malformed file - // name). - INVALID_ARGUMENT: 3, - - // Deadline expired before operation could complete. For operations that - // change the state of the system, this error may be returned even if the - // operation has completed successfully. For example, a successful response - // from a server could have been delayed long enough for the deadline to - // expire. - DEADLINE_EXCEEDED: 4, - - // Some requested entity (e.g., file or directory) was not found. - NOT_FOUND: 5, - - // Some entity that we attempted to create (e.g., file or directory) already - // exists. - ALREADY_EXISTS: 6, - - // The caller does not have permission to execute the specified operation. - // PERMISSION_DENIED must not be used for rejections caused by exhausting - // some resource (use RESOURCE_EXHAUSTED instead for those errors). - // PERMISSION_DENIED must not be used if the caller can not be identified - // (use UNAUTHENTICATED instead for those errors). - PERMISSION_DENIED: 7, - - // The request does not have valid authentication credentials for the - // operation. - UNAUTHENTICATED: 16, - - // Some resource has been exhausted, perhaps a per-user quota, or perhaps the - // entire file system is out of space. - RESOURCE_EXHAUSTED: 8, - - // Operation was rejected because the system is not in a state required for - // the operation's execution. For example, directory to be deleted may be - // non-empty, an rmdir operation is applied to a non-directory, etc. - // - // A litmus test that may help a service implementor in deciding - // between FAILED_PRECONDITION, ABORTED, and UNAVAILABLE: - // (a) Use UNAVAILABLE if the client can retry just the failing call. - // (b) Use ABORTED if the client should retry at a higher-level - // (e.g., restarting a read-modify-write sequence). - // (c) Use FAILED_PRECONDITION if the client should not retry until - // the system state has been explicitly fixed. E.g., if an "rmdir" - // fails because the directory is non-empty, FAILED_PRECONDITION - // should be returned since the client should not retry unless - // they have first fixed up the directory by deleting files from it. - // (d) Use FAILED_PRECONDITION if the client performs conditional - // REST Get/Update/Delete on a resource and the resource on the - // server does not match the condition. E.g., conflicting - // read-modify-write on the same resource. - FAILED_PRECONDITION: 9, - - // The operation was aborted, typically due to a concurrency issue like - // sequencer check failures, transaction aborts, etc. - // - // See litmus test above for deciding between FAILED_PRECONDITION, ABORTED, - // and UNAVAILABLE. - ABORTED: 10, - - // Operation was attempted past the valid range. E.g., seeking or reading - // past end of file. - // - // Unlike INVALID_ARGUMENT, this error indicates a problem that may be fixed - // if the system state changes. For example, a 32-bit file system will - // generate INVALID_ARGUMENT if asked to read at an offset that is not in the - // range [0,2^32-1], but it will generate OUT_OF_RANGE if asked to read from - // an offset past the current file size. - // - // There is a fair bit of overlap between FAILED_PRECONDITION and - // OUT_OF_RANGE. We recommend using OUT_OF_RANGE (the more specific error) - // when it applies so that callers who are iterating through a space can - // easily look for an OUT_OF_RANGE error to detect when they are done. - OUT_OF_RANGE: 11, - - // Operation is not implemented or not supported/enabled in this service. - UNIMPLEMENTED: 12, - - // Internal errors. Means some invariants expected by underlying System has - // been broken. If you see one of these errors, Something is very broken. - INTERNAL: 13, - - // The service is currently unavailable. This is a most likely a transient - // condition and may be corrected by retrying with a backoff. - // - // See litmus test above for deciding between FAILED_PRECONDITION, ABORTED, - // and UNAVAILABLE. - UNAVAILABLE: 14, - - // Unrecoverable data loss or corruption. - DATA_LOSS: 15, - - // Force users to include a default branch: - DO_NOT_USE: -1, -}; - /*! * The comparator used for document watches (which should always get called with * the same document). @@ -429,7 +313,7 @@ abstract class Watch { * Closes the stream and calls onError() if the stream is still active. * @private */ - private closeStream(err: GrpcError): void { + private closeStream(err: GoogleError): void { if (this.currentStream) { this.currentStream.end(); this.currentStream = null; @@ -447,8 +331,8 @@ abstract class Watch { * Clears the change map. * @private */ - private maybeReopenStream(err: GrpcError): void { - if (this.isActive && !this.isPermanentError(err)) { + private maybeReopenStream(err: GoogleError): void { + if (this.isActive && !this.isPermanentWatchError(err)) { logger( 'Watch.maybeReopenStream', this.requestTag, @@ -532,8 +416,8 @@ abstract class Watch { if (this.currentStream === backendStream) { this.currentStream = null; - const err = new GrpcError('Stream ended unexpectedly'); - err.code = GRPC_STATUS_CODE.UNKNOWN; + const err = new GoogleError('Stream ended unexpectedly'); + err.code = Status.UNKNOWN; this.maybeReopenStream(err); } }); @@ -569,7 +453,7 @@ abstract class Watch { this.closeStream(Error('Unexpected target ID sent by server')); } } else if (change.targetChangeType === 'REMOVE') { - let code = 13; + let code = Status.INTERNAL; let message = 'internal error'; if (change.cause) { code = change.cause.code!; @@ -808,7 +692,7 @@ abstract class Watch { } /** - * Determines whether an error is considered permanent and should not be + * Determines whether a watch error is considered permanent and should not be * retried. Errors that don't provide a GRPC error code are always considered * transient in this context. * @@ -816,7 +700,7 @@ abstract class Watch { * @param error An error object. * @return Whether the error is permanent. */ - private isPermanentError(error: GrpcError): boolean { + private isPermanentWatchError(error: GoogleError): boolean { if (error.code === undefined) { logger( 'Watch.isPermanentError', @@ -828,14 +712,14 @@ abstract class Watch { } switch (error.code) { - case GRPC_STATUS_CODE.ABORTED: - case GRPC_STATUS_CODE.CANCELLED: - case GRPC_STATUS_CODE.UNKNOWN: - case GRPC_STATUS_CODE.DEADLINE_EXCEEDED: - case GRPC_STATUS_CODE.RESOURCE_EXHAUSTED: - case GRPC_STATUS_CODE.INTERNAL: - case GRPC_STATUS_CODE.UNAVAILABLE: - case GRPC_STATUS_CODE.UNAUTHENTICATED: + case Status.ABORTED: + case Status.CANCELLED: + case Status.UNKNOWN: + case Status.DEADLINE_EXCEEDED: + case Status.RESOURCE_EXHAUSTED: + case Status.INTERNAL: + case Status.UNAVAILABLE: + case Status.UNAUTHENTICATED: return false; default: return true; @@ -850,8 +734,8 @@ abstract class Watch { * @param error A GRPC Error object that exposes an error code. * @return Whether we need to back off our retries. */ - private isResourceExhaustedError(error: GrpcError): boolean { - return error.code === GRPC_STATUS_CODE.RESOURCE_EXHAUSTED; + private isResourceExhaustedError(error: GoogleError): boolean { + return error.code === Status.RESOURCE_EXHAUSTED; } } diff --git a/dev/src/write-batch.ts b/dev/src/write-batch.ts index 1f6cc7cd4..9779c0ea3 100644 --- a/dev/src/write-batch.ts +++ b/dev/src/write-batch.ts @@ -545,8 +545,7 @@ export class WriteBatch { .request( 'beginTransaction', request, - tag, - true + tag ) .then(resp => { return this.commit_({transactionId: resp.transaction!}); @@ -587,12 +586,7 @@ export class WriteBatch { } return this._firestore - .request( - 'commit', - request, - tag, - /* allowRetries= */ false - ) + .request('commit', request, tag) .then(resp => { const writeResults: WriteResult[] = []; diff --git a/dev/test/document.ts b/dev/test/document.ts index 773fa0f6a..37bc425db 100644 --- a/dev/test/document.ts +++ b/dev/test/document.ts @@ -44,7 +44,7 @@ import { writeResult, } from './util/helpers'; -import api = proto.google.firestore.v1; +import {GoogleError, Status} from 'google-gax'; const PROJECT_ID = 'test-project'; @@ -410,30 +410,6 @@ describe('deserialize document', () => { }); }); - it('ignores intermittent stream failures', () => { - let attempts = 1; - - const overrides: ApiOverride = { - batchGetDocuments: () => { - if (attempts < 3) { - ++attempts; - throw new Error('Expected error'); - } else { - return stream(found(document('documentId'))); - } - }, - }; - - return createInstance(overrides).then(firestore => { - return firestore - .doc('collectionId/documentId') - .get() - .then(() => { - expect(attempts).to.equal(3); - }); - }); - }); - it('deserializes date before 1970', () => { const overrides: ApiOverride = { batchGetDocuments: () => { @@ -674,7 +650,9 @@ describe('get document', () => { it('throws error', done => { const overrides: ApiOverride = { batchGetDocuments: () => { - return stream(new Error('RPC Error')); + const error = new GoogleError('RPC Error'); + error.code = Status.PERMISSION_DENIED; + return stream(error); }, }; diff --git a/dev/test/index.ts b/dev/test/index.ts index 7cd355631..59060dd22 100644 --- a/dev/test/index.ts +++ b/dev/test/index.ts @@ -21,8 +21,8 @@ import {google} from '../protos/firestore_v1_proto_api'; import * as Firestore from '../src'; import {DocumentSnapshot, FieldPath} from '../src'; +import {setTimeoutHandler} from '../src/backoff'; import {QualifiedResourcePath} from '../src/path'; -import {GrpcError} from '../src/types'; import { ApiOverride, createInstance, @@ -36,6 +36,7 @@ import { } from './util/helpers'; import api = google.firestore.v1; +import {Status} from 'google-gax'; use(chaiAsPromised); @@ -901,6 +902,14 @@ describe('listCollections() method', () => { }); describe('getAll() method', () => { + before(() => { + setTimeoutHandler(setImmediate); + }); + + after(() => { + setTimeoutHandler(setTimeout); + }); + function resultEquals( result: DocumentSnapshot[], ...docs: api.IBatchGetDocumentsResponse[] @@ -995,6 +1004,30 @@ describe('getAll() method', () => { }); }); + it('handles intermittent stream exception', () => { + let attempts = 1; + + const overrides: ApiOverride = { + batchGetDocuments: () => { + if (attempts < 3) { + ++attempts; + throw new Error('Expected error'); + } else { + return stream(found(document('documentId'))); + } + }, + }; + + return createInstance(overrides).then(firestore => { + return firestore + .doc('collectionId/documentId') + .get() + .then(() => { + expect(attempts).to.equal(3); + }); + }); + }); + it('handles serialization error', () => { const overrides: ApiOverride = { batchGetDocuments: () => { @@ -1018,60 +1051,54 @@ describe('getAll() method', () => { }); }); - it('only retries on GRPC unavailable', () => { - const expectedErrorAttempts = { - /* Cancelled */ 1: 1, - /* Unknown */ 2: 1, - /* InvalidArgument */ 3: 1, - /* DeadlineExceeded */ 4: 1, - /* NotFound */ 5: 1, - /* AlreadyExists */ 6: 1, - /* PermissionDenied */ 7: 1, - /* ResourceExhausted */ 8: 1, - /* FailedPrecondition */ 9: 1, - /* Aborted */ 10: 1, - /* OutOfRange */ 11: 1, - /* Unimplemented */ 12: 1, - /* Internal */ 13: 1, - /* Unavailable */ 14: 5, - /* DataLoss */ 15: 1, - /* Unauthenticated */ 16: 1, + it('retries based on error code', () => { + const expectedErrorAttempts: {[key: number]: number} = { + [Status.CANCELLED]: 1, + [Status.UNKNOWN]: 1, + [Status.INVALID_ARGUMENT]: 1, + [Status.DEADLINE_EXCEEDED]: 5, + [Status.NOT_FOUND]: 1, + [Status.ALREADY_EXISTS]: 1, + [Status.PERMISSION_DENIED]: 1, + [Status.RESOURCE_EXHAUSTED]: 1, + [Status.FAILED_PRECONDITION]: 1, + [Status.ABORTED]: 1, + [Status.OUT_OF_RANGE]: 1, + [Status.UNIMPLEMENTED]: 1, + [Status.INTERNAL]: 5, + [Status.UNAVAILABLE]: 5, + [Status.DATA_LOSS]: 1, + [Status.UNAUTHENTICATED]: 1, }; - const actualErrorAttempts: {[k: number]: number} = {}; + const actualErrorAttempts: {[key: number]: number} = {}; const overrides: ApiOverride = { batchGetDocuments: request => { const errorCode = Number(request.documents![0].split('/').pop()); actualErrorAttempts[errorCode] = (actualErrorAttempts[errorCode] || 0) + 1; - const error = new GrpcError('Expected exception'); + const error = new gax.GoogleError('Expected exception'); error.code = errorCode; return stream(error); }, }; - return createInstance(overrides).then(firestore => { + return createInstance(overrides).then(async firestore => { const coll = firestore.collection('collectionId'); - const promises: Array> = []; - - Object.keys(expectedErrorAttempts).forEach(errorCode => { - promises.push( - firestore - .getAll(coll.doc(`${errorCode}`)) - .then(() => { - throw new Error('Unexpected success in Promise'); - }) - .catch(err => { - expect(err.code).to.equal(Number(errorCode)); - }) - ); - }); + for (const errorCode of Object.keys(expectedErrorAttempts)) { + await firestore + .getAll(coll.doc(`${errorCode}`)) + .then(() => { + throw new Error('Unexpected success in Promise'); + }) + .catch(err => { + expect(err.code).to.equal(Number(errorCode)); + }); + } - return Promise.all(promises).then(() => { - expect(actualErrorAttempts).to.deep.eq(expectedErrorAttempts); - }); + expect(actualErrorAttempts).to.deep.eq(expectedErrorAttempts); }); }).timeout(5000); diff --git a/dev/test/query.ts b/dev/test/query.ts index 03ff08cb1..6aa4fda80 100644 --- a/dev/test/query.ts +++ b/dev/test/query.ts @@ -18,6 +18,7 @@ import * as extend from 'extend'; import {google} from '../protos/firestore_v1_proto_api'; import {FieldPath, FieldValue, Firestore, setLogFunction} from '../src'; import {DocumentData, DocumentReference, Query, Timestamp} from '../src'; +import {setTimeoutHandler} from '../src/backoff'; import {DocumentSnapshot, DocumentSnapshotBuilder} from '../src/document'; import {QualifiedResourcePath} from '../src/path'; import { @@ -286,12 +287,16 @@ describe('query interface', () => { let firestore: Firestore; beforeEach(() => { + setTimeoutHandler(setImmediate); return createInstance().then(firestoreInstance => { firestore = firestoreInstance; }); }); - afterEach(() => verifyInstance(firestore)); + afterEach(() => { + verifyInstance(firestore); + setTimeoutHandler(setTimeout); + }); it('has isEqual() method', () => { const query = firestore.collection('collectionId'); diff --git a/dev/test/transaction.ts b/dev/test/transaction.ts index 570b38437..0e202261c 100644 --- a/dev/test/transaction.ts +++ b/dev/test/transaction.ts @@ -15,6 +15,7 @@ import {expect, use} from 'chai'; import * as chaiAsPromised from 'chai-as-promised'; import * as extend from 'extend'; +import {GoogleError, Status} from 'google-gax'; import * as through2 from 'through2'; import * as proto from '../protos/firestore_v1_proto_api'; @@ -436,7 +437,8 @@ describe('failed transactions', () => { }); it('limits the retry attempts', () => { - const err = new Error('Retryable error'); + const err = new GoogleError('Server disconnect'); + err.code = Status.UNAVAILABLE; return expect( runTransaction( @@ -457,21 +459,6 @@ describe('failed transactions', () => { ).to.eventually.be.rejectedWith('Final exception'); }); - it('fails on beginTransaction', () => { - return expect( - runTransaction( - () => { - return Promise.resolve('success'); - }, - begin('foo', undefined, new Error('Fails (1) on beginTransaction')), - begin('foo', undefined, new Error('Fails (2) on beginTransaction')), - begin('foo', undefined, new Error('Fails (3) on beginTransaction')), - begin('foo', undefined, new Error('Fails (4) on beginTransaction')), - begin('foo', undefined, new Error('Fails (5) on beginTransaction')) - ) - ).to.eventually.be.rejectedWith('Fails (5) on beginTransaction'); - }); - it('fails on rollback', () => { return expect( runTransaction( diff --git a/dev/test/util/helpers.ts b/dev/test/util/helpers.ts index 9877f6faa..df050d683 100644 --- a/dev/test/util/helpers.ts +++ b/dev/test/util/helpers.ts @@ -20,7 +20,7 @@ import * as through2 from 'through2'; import * as proto from '../../protos/firestore_v1_proto_api'; import {Firestore} from '../../src'; import {ClientPool} from '../../src/pool'; -import {GapicClient, GrpcError} from '../../src/types'; +import {GapicClient} from '../../src/types'; import api = proto.google.firestore.v1; @@ -67,7 +67,7 @@ export interface ApiOverride { listDocuments?: ( request: api.IListDocumentsRequest, options: CallOptions, - callback: (err?: GrpcError | null, resp?: api.IDocument[]) => void + callback: (err?: Error | null, resp?: api.IDocument[]) => void ) => void; batchGetDocuments?: ( request: api.IBatchGetDocumentsRequest diff --git a/dev/test/watch.ts b/dev/test/watch.ts index 5a5dbbc6e..5975ee666 100644 --- a/dev/test/watch.ts +++ b/dev/test/watch.ts @@ -16,6 +16,7 @@ const duplexify = require('duplexify'); import {expect} from 'chai'; import * as extend from 'extend'; +import {GoogleError, Status} from 'google-gax'; import {Transform} from 'stream'; import * as through2 from 'through2'; @@ -23,27 +24,23 @@ import {google} from '../protos/firestore_v1_proto_api'; import { CollectionReference, - FieldPath, - Firestore, - GeoPoint, - setLogFunction, - Timestamp, -} from '../src'; -import { DocumentData, DocumentReference, DocumentSnapshot, + FieldPath, + Firestore, + GeoPoint, Query, QueryDocumentSnapshot, QuerySnapshot, + setLogFunction, + Timestamp, } from '../src'; import {MAX_RETRY_ATTEMPTS, setTimeoutHandler} from '../src/backoff'; import {DocumentSnapshotBuilder} from '../src/document'; import {DocumentChangeType} from '../src/document-change'; import {Serializer} from '../src/serializer'; -import {GrpcError} from '../src/types'; import {createInstance, InvalidApiUsage, verifyInstance} from './util/helpers'; - import api = google.firestore.v1; // Change the argument to 'console.log' to enable debug output. @@ -323,10 +320,10 @@ class StreamHelper { * Destroys the currently active stream with the optionally provided error. * If omitted, the stream is closed with a GRPC Status of UNAVAILABLE. */ - destroyStream(err?: GrpcError): void { + destroyStream(err?: GoogleError): void { if (!err) { - err = new GrpcError('Server disconnect'); - err.code = 14; // Unavailable + err = new GoogleError('Server disconnect'); + err.code = Status.UNAVAILABLE; } this.readStream!.destroy(err); } @@ -820,8 +817,8 @@ describe('Query watch', () => { }); it('stops attempts after maximum retry attempts', () => { - const err = new GrpcError('GRPC Error'); - err.code = Number(10 /* ABORTED */); + const err = new GoogleError('GRPC Error'); + err.code = Status.ABORTED; return watchHelper.runFailedTest( collQueryJSON(), async () => { @@ -863,71 +860,62 @@ describe('Query watch', () => { }); }); - it('retries based on error code', () => { - const expectRetry: {[k: number]: boolean} = { - /* Cancelled */ 1: true, - /* Unknown */ 2: true, - /* InvalidArgument */ 3: false, - /* DeadlineExceeded */ 4: true, - /* NotFound */ 5: false, - /* AlreadyExists */ 6: false, - /* PermissionDenied */ 7: false, - /* ResourceExhausted */ 8: true, - /* FailedPrecondition */ 9: false, - /* Aborted */ 10: true, - /* OutOfRange */ 11: false, - /* Unimplemented */ 12: false, - /* Internal */ 13: true, - /* Unavailable */ 14: true, - /* DataLoss */ 15: false, - /* Unauthenticated */ 16: true, - }; - - let result = Promise.resolve(); - - for (const statusCode in expectRetry) { - if (expectRetry.hasOwnProperty(statusCode)) { - result = result.then(() => { - const err = new GrpcError('GRPC Error'); - err.code = Number(statusCode); - - if (expectRetry[statusCode]) { - return watchHelper.runTest(collQueryJSON(), () => { - watchHelper.sendAddTarget(); - watchHelper.sendCurrent(); - watchHelper.sendSnapshot(1, Buffer.from([0xabcd])); - return watchHelper.await('snapshot').then(() => { + it('retries based on error code', async () => { + const testCases = new Map(); + testCases.set(Status.CANCELLED, true); + testCases.set(Status.UNKNOWN, true); + testCases.set(Status.INVALID_ARGUMENT, false); + testCases.set(Status.DEADLINE_EXCEEDED, true); + testCases.set(Status.NOT_FOUND, false); + testCases.set(Status.ALREADY_EXISTS, false); + testCases.set(Status.PERMISSION_DENIED, false); + testCases.set(Status.RESOURCE_EXHAUSTED, true); + testCases.set(Status.FAILED_PRECONDITION, false); + testCases.set(Status.ABORTED, true); + testCases.set(Status.OUT_OF_RANGE, false); + testCases.set(Status.UNIMPLEMENTED, false); + testCases.set(Status.INTERNAL, true); + testCases.set(Status.UNAVAILABLE, true); + testCases.set(Status.DATA_LOSS, false); + testCases.set(Status.UNAUTHENTICATED, true); + + for (const [statusCode, expectRetry] of testCases) { + const err = new GoogleError('GRPC Error'); + err.code = statusCode; + + if (expectRetry) { + await watchHelper.runTest(collQueryJSON(), () => { + watchHelper.sendAddTarget(); + watchHelper.sendCurrent(); + watchHelper.sendSnapshot(1, Buffer.from([0xabcd])); + return watchHelper.await('snapshot').then(() => { + streamHelper.destroyStream(err); + return streamHelper.awaitReopen(); + }); + }); + } else { + await watchHelper.runFailedTest( + collQueryJSON(), + () => { + watchHelper.sendAddTarget(); + watchHelper.sendCurrent(); + watchHelper.sendSnapshot(1, Buffer.from([0xabcd])); + return watchHelper + .await('snapshot') + .then(() => { streamHelper.destroyStream(err); - return streamHelper.awaitReopen(); + }) + .then(() => { + return streamHelper.await('error'); + }) + .then(() => { + return streamHelper.await('close'); }); - }); - } else { - return watchHelper.runFailedTest( - collQueryJSON(), - () => { - watchHelper.sendAddTarget(); - watchHelper.sendCurrent(); - watchHelper.sendSnapshot(1, Buffer.from([0xabcd])); - return watchHelper - .await('snapshot') - .then(() => { - streamHelper.destroyStream(err); - }) - .then(() => { - return streamHelper.await('error'); - }) - .then(() => { - return streamHelper.await('close'); - }); - }, - 'GRPC Error' - ); - } - }); + }, + 'GRPC Error' + ); } } - - return result; }).timeout(5000); it('retries with unknown code', () => { diff --git a/package.json b/package.json index 05e7fef74..886654964 100644 --- a/package.json +++ b/package.json @@ -49,7 +49,7 @@ "dependencies": { "deep-equal": "^2.0.0", "functional-red-black-tree": "^1.0.1", - "google-gax": "^1.12.0", + "google-gax": "^1.13.0", "readable-stream": "^3.4.0", "through2": "^3.0.0" },