Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use GAX retry config for streams #847

Merged
merged 17 commits into from Jan 2, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
153 changes: 65 additions & 88 deletions dev/src/index.ts
Expand Up @@ -14,12 +14,13 @@
* limitations under the License.
*/

import {CallOptions} from 'google-gax';
import {CallOptions, GoogleError} from 'google-gax';
import {Duplex, PassThrough} from 'stream';
import * as through2 from 'through2';
import {URL} from 'url';

import {google} from '../protos/firestore_v1_proto_api';
import {ExponentialBackoff} from './backoff';
import {fieldsFromJson, timestampFromJson} from './convert';
import {
DocumentSnapshot,
Expand All @@ -40,14 +41,8 @@ import {DocumentReference} from './reference';
import {Serializer} from './serializer';
import {Timestamp} from './timestamp';
import {parseGetAllArguments, Transaction} from './transaction';
import {
ApiMapValue,
GapicClient,
GrpcError,
ReadOptions,
Settings,
} from './types';
import {Deferred, requestTag} from './util';
import {ApiMapValue, GapicClient, ReadOptions, Settings} from './types';
import {Deferred, isPermanentRpcError, requestTag} from './util';
import {
validateBoolean,
validateFunction,
Expand All @@ -59,6 +54,8 @@ import {
} from './validate';
import {WriteBatch} from './write-batch';

import * as clientConfig from './v1/firestore_client_config.json';

import api = google.firestore.v1;

export {
Expand Down Expand Up @@ -1107,62 +1104,56 @@ export class Firestore {
* for further attempts.
*
* @private
* @param attemptsRemaining The number of available attempts.
* @param methodName Name of the Veneer API endpoint that takes a request
* and GAX options.
* @param requestTag A unique client-assigned identifier for this request.
* @param func Method returning a Promise than can be retried.
* @param delayMs How long to wait before issuing a this retry. Defaults to
* zero.
* @returns - A Promise with the function's result if successful within
* `attemptsRemaining`. Otherwise, returns the last rejected Promise.
*/
private _retry<T>(
attemptsRemaining: number,
private async _retry<T>(
methodName: string,
requestTag: string,
func: () => Promise<T>,
delayMs = 0
func: () => Promise<T>
): Promise<T> {
const self = this;
const backoff = new ExponentialBackoff();

const currentDelay = delayMs;
const nextDelay = delayMs || 100;
let lastError: Error | undefined = undefined;

--attemptsRemaining;

return new Promise(resolve => {
setTimeout(resolve, currentDelay);
})
.then(func)
.then(result => {
self._lastSuccessfulRequest = new Date().getTime();
return result;
})
.catch(err => {
if (err.code !== undefined && err.code !== GRPC_UNAVAILABLE) {
logger(
'Firestore._retry',
requestTag,
'Request failed with unrecoverable error:',
err
);
return Promise.reject(err);
}
if (attemptsRemaining === 0) {
logger(
'Firestore._retry',
requestTag,
'Request failed with error:',
err
);
return Promise.reject(err);
}
for (let attempt = 0; attempt < MAX_REQUEST_RETRIES; ++attempt) {
if (lastError) {
logger(
'Firestore._retry',
requestTag,
'Retrying request that failed with error:',
err
lastError
);
return self._retry(attemptsRemaining, requestTag, func, nextDelay);
});
}

try {
await backoff.backoffAndWait();
const result = await func();
this._lastSuccessfulRequest = new Date().getTime();
return result;
} catch (err) {
lastError = err;

if (
err.code !== undefined &&
isPermanentRpcError(err, methodName, clientConfig)
) {
break;
}
}
}

logger(
'Firestore._retry',
requestTag,
'Request failed with error:',
lastError
);
return Promise.reject(lastError);
}

/**
Expand Down Expand Up @@ -1269,51 +1260,37 @@ export class Firestore {
* necessary within the request options.
*
* @private
* @param methodName Name of the veneer API endpoint that takes a request
* @param methodName Name of the Veneer API endpoint that takes a request
* and GAX options.
* @param request The Protobuf request to send.
* @param requestTag A unique client-assigned identifier for this request.
* @param allowRetries Whether this is an idempotent request that can be
* retried.
* @returns A Promise with the request result.
*/
request<T>(
methodName: string,
request: {},
requestTag: string,
allowRetries: boolean
): Promise<T> {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
request<T>(methodName: string, request: {}, requestTag: string): Promise<T> {
const callOptions = this.createCallOptions();

return this._clientPool.run(requestTag, gapicClient => {
return this._retry(attempts, requestTag, () => {
return new Promise((resolve, reject) => {
logger(
'Firestore.request',
requestTag,
'Sending request: %j',
request
);
gapicClient[methodName](
request,
callOptions,
(err: GrpcError, result: T) => {
if (err) {
logger('Firestore.request', requestTag, 'Received error:', err);
reject(err);
} else {
logger(
'Firestore.request',
requestTag,
'Received response: %j',
result
);
resolve(result);
}
return new Promise((resolve, reject) => {
logger('Firestore.request', requestTag, 'Sending request: %j', request);
gapicClient[methodName](
request,
callOptions,
(err: GoogleError, result: T) => {
if (err) {
logger('Firestore.request', requestTag, 'Received error:', err);
reject(err);
} else {
logger(
'Firestore.request',
requestTag,
'Received response: %j',
result
);
this._lastSuccessfulRequest = new Date().getTime();
resolve(result);
}
);
});
}
);
});
});
}
Expand Down Expand Up @@ -1349,7 +1326,7 @@ export class Firestore {
// stream.
const lifetime = new Deferred<void>();

this._retry(MAX_REQUEST_RETRIES, requestTag, async () => {
this._retry(methodName, requestTag, async () => {
logger(
'Firestore.requestStream',
requestTag,
Expand Down
14 changes: 2 additions & 12 deletions dev/src/reference.ts
Expand Up @@ -293,12 +293,7 @@ export class DocumentReference implements Serializable {
pageSize: Math.pow(2, 16) - 1,
};
return this._firestore
.request<string[]>(
'listCollectionIds',
request,
tag,
/* allowRetries= */ true
)
.request<string[]>('listCollectionIds', request, tag)
.then(collectionIds => {
const collections: CollectionReference[] = [];

Expand Down Expand Up @@ -2065,12 +2060,7 @@ export class CollectionReference extends Query {
};

return this.firestore
.request<api.IDocument[]>(
'listDocuments',
request,
tag,
/*allowRetries=*/ true
)
.request<api.IDocument[]>('listDocuments', request, tag)
.then(documents => {
// Note that the backend already orders these documents by name,
// so we do not need to manually sort them.
Expand Down
15 changes: 2 additions & 13 deletions dev/src/transaction.ts
Expand Up @@ -50,11 +50,6 @@ import api = proto.google.firestore.v1;
const READ_AFTER_WRITE_ERROR_MSG =
'Firestore transactions require all reads to be executed before all writes.';

/*!
* Transactions can be retried if the initial stream opening errors out.
*/
const ALLOW_RETRIES = true;

/**
* A reference to a transaction.
*
Expand Down Expand Up @@ -372,8 +367,7 @@ export class Transaction {
.request<api.IBeginTransactionResponse>(
'beginTransaction',
request,
this._requestTag,
ALLOW_RETRIES
this._requestTag
)
.then(resp => {
this._transactionId = resp.transaction!;
Expand Down Expand Up @@ -405,12 +399,7 @@ export class Transaction {
transaction: this._transactionId,
};

return this._firestore.request(
'rollback',
request,
this._requestTag,
/* allowRetries= */ false
);
return this._firestore.request('rollback', request, this._requestTag);
}

/**
Expand Down