Skip to content

Commit

Permalink
feat: use GAX retry config for streams (#847)
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Jan 2, 2020
1 parent 2ad8acc commit 218a4c6
Show file tree
Hide file tree
Showing 16 changed files with 275 additions and 431 deletions.
4 changes: 1 addition & 3 deletions README.md
Expand Up @@ -15,9 +15,7 @@ This is the Node.js Server SDK for [Google Cloud Firestore](https://firebase.goo

This Cloud Firestore Server SDK uses Google’s Cloud Identity and Access Management for authentication and should only be used in trusted environments. Your Cloud Identity credentials allow you bypass all access restrictions and provide read and write access to all data in your Cloud Firestore project.

The Cloud Firestore Server SDKs are designed to manage the full set of data in your Cloud Firestore project and work best with reliable network connectivity. Data operations performed via these SDKs directly access the Cloud Firestore backend and all document reads and writes are optimized for high throughput.

Applications that use Google's Server SDKs should not be used in end-user environments, such as on phones or on publicly hosted websites. If you are developing a Web or Node.js application that accesses Cloud Firestore on behalf of end users, use the firebase Client SDK.
Applications that use Google's Server SDKs should not be used in end-user environments, such as on phones or on publicly hosted websites. If you are developing a Web or Node.js application that accesses Cloud Firestore on behalf of end users, use the Firebase Client SDK.

**Note:** This Cloud Firestore Server SDK does not support Firestore databases created in [Datastore mode](https://cloud.google.com/datastore/docs/firestore-or-datastore#in_datastore_mode). To access these databases, use the [Datastore SDK](https://www.npmjs.com/package/@google-cloud/datastore).

Expand Down
4 changes: 2 additions & 2 deletions dev/src/backoff.ts
Expand Up @@ -80,7 +80,7 @@ export function setTimeoutHandler(
*
* @private
*/
export interface ExponentialBackoffOptions {
export interface ExponentialBackoffSetting {
/** Optional override for the initial retry delay. */
initialDelayMs?: number;
/** Optional override for the exponential backoff factor. */
Expand Down Expand Up @@ -162,7 +162,7 @@ export class ExponentialBackoff {
*/
private awaitingBackoffCompletion = false;

constructor(options: ExponentialBackoffOptions = {}) {
constructor(options: ExponentialBackoffSetting = {}) {
this.initialDelayMs =
options.initialDelayMs !== undefined
? options.initialDelayMs
Expand Down
169 changes: 76 additions & 93 deletions dev/src/index.ts
Expand Up @@ -14,12 +14,13 @@
* limitations under the License.
*/

import {CallOptions} from 'google-gax';
import {CallOptions, GoogleError} from 'google-gax';
import {Duplex, PassThrough} from 'stream';
import * as through2 from 'through2';
import {URL} from 'url';

import {google} from '../protos/firestore_v1_proto_api';
import {ExponentialBackoff, ExponentialBackoffSetting} from './backoff';
import {fieldsFromJson, timestampFromJson} from './convert';
import {
DocumentSnapshot,
Expand All @@ -40,14 +41,8 @@ import {DocumentReference} from './reference';
import {Serializer} from './serializer';
import {Timestamp} from './timestamp';
import {parseGetAllArguments, Transaction} from './transaction';
import {
ApiMapValue,
GapicClient,
GrpcError,
ReadOptions,
Settings,
} from './types';
import {Deferred, requestTag} from './util';
import {ApiMapValue, GapicClient, ReadOptions, Settings} from './types';
import {Deferred, isPermanentRpcError, requestTag} from './util';
import {
validateBoolean,
validateFunction,
Expand All @@ -59,6 +54,9 @@ import {
} from './validate';
import {WriteBatch} from './write-batch';

import {interfaces} from './v1/firestore_client_config.json';
const serviceConfig = interfaces['google.firestore.v1.Firestore'];

import api = google.firestore.v1;

export {
Expand Down Expand Up @@ -143,11 +141,6 @@ const DEFAULT_MAX_IDLE_CHANNELS = 1;
*/
const MAX_CONCURRENT_REQUESTS_PER_CLIENT = 100;

/*!
* GRPC Error code for 'UNAVAILABLE'.
*/
const GRPC_UNAVAILABLE = 14;

/**
* Document data (e.g. for use with
* [set()]{@link DocumentReference#set}) consisting of fields mapped
Expand Down Expand Up @@ -265,6 +258,12 @@ export class Firestore {
*/
private _settings: Settings = {};

/**
* Settings for the exponential backoff used by the streaming endpoints.
* @private
*/
private _backoffSettings: ExponentialBackoffSetting;

/**
* Whether the initialization settings can still be changed by invoking
* `settings()`.
Expand Down Expand Up @@ -368,6 +367,13 @@ export class Firestore {
this.validateAndApplySettings({...settings, ...libraryHeader});
}

const retryConfig = serviceConfig.retry_params.default;
this._backoffSettings = {
initialDelayMs: retryConfig.initial_retry_delay_millis,
maxDelayMs: retryConfig.max_retry_delay_millis,
backoffFactor: retryConfig.retry_delay_multiplier,
};

// GCF currently tears down idle connections after two minutes. Requests
// that are issued after this period may fail. On GCF, we therefore issue
// these requests as part of a transaction so that we can safely retry until
Expand Down Expand Up @@ -1107,62 +1113,53 @@ export class Firestore {
* for further attempts.
*
* @private
* @param attemptsRemaining The number of available attempts.
* @param methodName Name of the Veneer API endpoint that takes a request
* and GAX options.
* @param requestTag A unique client-assigned identifier for this request.
* @param func Method returning a Promise than can be retried.
* @param delayMs How long to wait before issuing a this retry. Defaults to
* zero.
* @returns - A Promise with the function's result if successful within
* `attemptsRemaining`. Otherwise, returns the last rejected Promise.
*/
private _retry<T>(
attemptsRemaining: number,
private async _retry<T>(
methodName: string,
requestTag: string,
func: () => Promise<T>,
delayMs = 0
func: () => Promise<T>
): Promise<T> {
const self = this;
const backoff = new ExponentialBackoff();

const currentDelay = delayMs;
const nextDelay = delayMs || 100;
let lastError: Error | undefined = undefined;

--attemptsRemaining;

return new Promise(resolve => {
setTimeout(resolve, currentDelay);
})
.then(func)
.then(result => {
self._lastSuccessfulRequest = new Date().getTime();
return result;
})
.catch(err => {
if (err.code !== undefined && err.code !== GRPC_UNAVAILABLE) {
logger(
'Firestore._retry',
requestTag,
'Request failed with unrecoverable error:',
err
);
return Promise.reject(err);
}
if (attemptsRemaining === 0) {
logger(
'Firestore._retry',
requestTag,
'Request failed with error:',
err
);
return Promise.reject(err);
}
for (let attempt = 0; attempt < MAX_REQUEST_RETRIES; ++attempt) {
if (lastError) {
logger(
'Firestore._retry',
requestTag,
'Retrying request that failed with error:',
err
lastError
);
return self._retry(attemptsRemaining, requestTag, func, nextDelay);
});
}

try {
await backoff.backoffAndWait();
const result = await func();
this._lastSuccessfulRequest = new Date().getTime();
return result;
} catch (err) {
lastError = err;

if (isPermanentRpcError(err, methodName, serviceConfig)) {
break;
}
}
}

logger(
'Firestore._retry',
requestTag,
'Request failed with error:',
lastError
);
return Promise.reject(lastError);
}

/**
Expand Down Expand Up @@ -1269,51 +1266,37 @@ export class Firestore {
* necessary within the request options.
*
* @private
* @param methodName Name of the veneer API endpoint that takes a request
* @param methodName Name of the Veneer API endpoint that takes a request
* and GAX options.
* @param request The Protobuf request to send.
* @param requestTag A unique client-assigned identifier for this request.
* @param allowRetries Whether this is an idempotent request that can be
* retried.
* @returns A Promise with the request result.
*/
request<T>(
methodName: string,
request: {},
requestTag: string,
allowRetries: boolean
): Promise<T> {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
request<T>(methodName: string, request: {}, requestTag: string): Promise<T> {
const callOptions = this.createCallOptions();

return this._clientPool.run(requestTag, gapicClient => {
return this._retry(attempts, requestTag, () => {
return new Promise((resolve, reject) => {
logger(
'Firestore.request',
requestTag,
'Sending request: %j',
request
);
gapicClient[methodName](
request,
callOptions,
(err: GrpcError, result: T) => {
if (err) {
logger('Firestore.request', requestTag, 'Received error:', err);
reject(err);
} else {
logger(
'Firestore.request',
requestTag,
'Received response: %j',
result
);
resolve(result);
}
return new Promise((resolve, reject) => {
logger('Firestore.request', requestTag, 'Sending request: %j', request);
gapicClient[methodName](
request,
callOptions,
(err: GoogleError, result: T) => {
if (err) {
logger('Firestore.request', requestTag, 'Received error:', err);
reject(err);
} else {
logger(
'Firestore.request',
requestTag,
'Received response: %j',
result
);
this._lastSuccessfulRequest = new Date().getTime();
resolve(result);
}
);
});
}
);
});
});
}
Expand Down Expand Up @@ -1349,7 +1332,7 @@ export class Firestore {
// stream.
const lifetime = new Deferred<void>();

this._retry(MAX_REQUEST_RETRIES, requestTag, async () => {
this._retry(methodName, requestTag, async () => {
logger(
'Firestore.requestStream',
requestTag,
Expand Down
14 changes: 2 additions & 12 deletions dev/src/reference.ts
Expand Up @@ -293,12 +293,7 @@ export class DocumentReference implements Serializable {
pageSize: Math.pow(2, 16) - 1,
};
return this._firestore
.request<string[]>(
'listCollectionIds',
request,
tag,
/* allowRetries= */ true
)
.request<string[]>('listCollectionIds', request, tag)
.then(collectionIds => {
const collections: CollectionReference[] = [];

Expand Down Expand Up @@ -2065,12 +2060,7 @@ export class CollectionReference extends Query {
};

return this.firestore
.request<api.IDocument[]>(
'listDocuments',
request,
tag,
/*allowRetries=*/ true
)
.request<api.IDocument[]>('listDocuments', request, tag)
.then(documents => {
// Note that the backend already orders these documents by name,
// so we do not need to manually sort them.
Expand Down
15 changes: 2 additions & 13 deletions dev/src/transaction.ts
Expand Up @@ -50,11 +50,6 @@ import api = proto.google.firestore.v1;
const READ_AFTER_WRITE_ERROR_MSG =
'Firestore transactions require all reads to be executed before all writes.';

/*!
* Transactions can be retried if the initial stream opening errors out.
*/
const ALLOW_RETRIES = true;

/**
* A reference to a transaction.
*
Expand Down Expand Up @@ -372,8 +367,7 @@ export class Transaction {
.request<api.IBeginTransactionResponse>(
'beginTransaction',
request,
this._requestTag,
ALLOW_RETRIES
this._requestTag
)
.then(resp => {
this._transactionId = resp.transaction!;
Expand Down Expand Up @@ -405,12 +399,7 @@ export class Transaction {
transaction: this._transactionId,
};

return this._firestore.request(
'rollback',
request,
this._requestTag,
/* allowRetries= */ false
);
return this._firestore.request('rollback', request, this._requestTag);
}

/**
Expand Down
4 changes: 0 additions & 4 deletions dev/src/types.ts
Expand Up @@ -36,10 +36,6 @@ export type GapicClient = any;
// tslint:disable-next-line:no-any
export type RBTree = any;

export class GrpcError extends Error {
code?: number;
}

/**
* Settings used to directly configure a `Firestore` instance.
*/
Expand Down
25 changes: 25 additions & 0 deletions dev/src/util.ts
Expand Up @@ -14,6 +14,8 @@
* limitations under the License.
*/

import {GoogleError, ServiceConfig, Status} from 'google-gax';

/**
* A Promise implementation that supports deferred resolution.
* @private
Expand Down Expand Up @@ -92,3 +94,26 @@ export function isEmpty(value: {}): boolean {
export function isFunction(value: unknown): boolean {
return typeof value === 'function';
}

/**
* Determines whether the provided error is considered permanent for the given
* RPC.
*
* @private
*/
export function isPermanentRpcError(
err: GoogleError,
methodName: string,
config: ServiceConfig
): boolean {
if (err.code !== undefined) {
const serviceConfigName = methodName[0].toUpperCase() + methodName.slice(1);
const retryCodeNames = config.methods[serviceConfigName]!.retry_codes_name!;
const retryCodes = config.retry_codes![retryCodeNames].map(
errorName => Status[errorName as keyof typeof Status]
);
return retryCodes.indexOf(err.code) === -1;
} else {
return false;
}
}

0 comments on commit 218a4c6

Please sign in to comment.