Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use GAX retry config for streams #847

Merged
merged 17 commits into from Jan 2, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
168 changes: 68 additions & 100 deletions dev/src/index.ts
Expand Up @@ -15,11 +15,12 @@
*/

import * as bun from 'bun';
import {CallOptions} from 'google-gax';
import {CallOptions, ClientConfig, GoogleError} from 'google-gax';
import * as through2 from 'through2';
import {URL} from 'url';

import {google} from '../protos/firestore_v1_proto_api';
import {ExponentialBackoff} from './backoff';
import {fieldsFromJson, timestampFromJson} from './convert';
import {
DocumentSnapshot,
Expand All @@ -40,14 +41,8 @@ import {DocumentReference} from './reference';
import {Serializer} from './serializer';
import {Timestamp} from './timestamp';
import {parseGetAllArguments, Transaction} from './transaction';
import {
ApiMapValue,
GapicClient,
GrpcError,
ReadOptions,
Settings,
} from './types';
import {Deferred, requestTag} from './util';
import {ApiMapValue, GapicClient, ReadOptions, Settings} from './types';
import {Deferred, isPermanentRpcError, requestTag} from './util';
import {
validateBoolean,
validateFunction,
Expand All @@ -61,6 +56,8 @@ import {WriteBatch} from './write-batch';

import api = google.firestore.v1;

import * as clientConfig from './v1/firestore_client_config.json';

export {
CollectionReference,
DocumentReference,
Expand Down Expand Up @@ -950,7 +947,7 @@ export class Firestore {
const self = this;

return self
.readStream('batchGetDocuments', request, requestTag, true)
.readStream('batchGetDocuments', request, requestTag)
.then(stream => {
return new Promise<DocumentSnapshot[]>((resolve, reject) => {
stream
Expand Down Expand Up @@ -1107,62 +1104,55 @@ export class Firestore {
* for further attempts.
*
* @private
* @param attemptsRemaining The number of available attempts.
* @param methodName Name of the Veneer API endpoint that takes a request
* and GAX options.
* @param requestTag A unique client-assigned identifier for this request.
* @param func Method returning a Promise than can be retried.
* @param delayMs How long to wait before issuing a this retry. Defaults to
* zero.
* @returns - A Promise with the function's result if successful within
* `attemptsRemaining`. Otherwise, returns the last rejected Promise.
*/
private _retry<T>(
attemptsRemaining: number,
private async _retry<T>(
methodName: string,
requestTag: string,
func: () => Promise<T>,
delayMs = 0
func: () => Promise<T>
): Promise<T> {
const self = this;
const backoff = new ExponentialBackoff();

const currentDelay = delayMs;
const nextDelay = delayMs || 100;
let lastError: Error | undefined = undefined;

--attemptsRemaining;

return new Promise(resolve => {
setTimeout(resolve, currentDelay);
})
.then(func)
.then(result => {
self._lastSuccessfulRequest = new Date().getTime();
return result;
})
.catch(err => {
if (err.code !== undefined && err.code !== GRPC_UNAVAILABLE) {
logger(
'Firestore._retry',
requestTag,
'Request failed with unrecoverable error:',
err
);
return Promise.reject(err);
}
if (attemptsRemaining === 0) {
logger(
'Firestore._retry',
requestTag,
'Request failed with error:',
err
);
return Promise.reject(err);
}
for (let attempt = 0; attempt < MAX_REQUEST_RETRIES; ++attempt) {
if (lastError) {
logger(
'Firestore._retry',
requestTag,
'Retrying request that failed with error:',
err
lastError
);
return self._retry(attemptsRemaining, requestTag, func, nextDelay);
});
}

try {
const result = await backoff.backoffAndWait().then(func);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mixing await and then in the same line is kind of confusing. Is it the same as the following code?

await backoff.backoffAndWait();
this._lastSuccessfulRequest = new Date().getTime();
return await func();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not quite (the successful request should only be set after await func()). I cleaned it up.

this._lastSuccessfulRequest = new Date().getTime();
return result;
} catch (err) {
lastError = err;

if (
err.code !== undefined &&
isPermanentRpcError(err, methodName, clientConfig)
) {
break;
}
}
}

logger(
'Firestore._retry',
requestTag,
'Request failed with error:',
lastError
);
return Promise.reject(lastError);
}

/**
Expand Down Expand Up @@ -1313,51 +1303,37 @@ export class Firestore {
* necessary within the request options.
*
* @private
* @param methodName Name of the veneer API endpoint that takes a request
* @param methodName Name of the Veneer API endpoint that takes a request
* and GAX options.
* @param request The Protobuf request to send.
* @param requestTag A unique client-assigned identifier for this request.
* @param allowRetries Whether this is an idempotent request that can be
* retried.
* @returns A Promise with the request result.
*/
request<T>(
methodName: string,
request: {},
requestTag: string,
allowRetries: boolean
): Promise<T> {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
request<T>(methodName: string, request: {}, requestTag: string): Promise<T> {
const callOptions = this.createCallOptions();

return this._clientPool.run(requestTag, gapicClient => {
return this._retry(attempts, requestTag, () => {
return new Promise((resolve, reject) => {
logger(
'Firestore.request',
requestTag,
'Sending request: %j',
request
);
gapicClient[methodName](
request,
callOptions,
(err: GrpcError, result: T) => {
if (err) {
logger('Firestore.request', requestTag, 'Received error:', err);
reject(err);
} else {
logger(
'Firestore.request',
requestTag,
'Received response: %j',
result
);
resolve(result);
}
return new Promise((resolve, reject) => {
logger('Firestore.request', requestTag, 'Sending request: %j', request);
gapicClient[methodName](
request,
callOptions,
(err: GoogleError, result: T) => {
if (err) {
logger('Firestore.request', requestTag, 'Received error:', err);
reject(err);
} else {
logger(
'Firestore.request',
requestTag,
'Received response: %j',
result
);
this._lastSuccessfulRequest = new Date().getTime();
resolve(result);
}
);
});
}
);
});
});
}
Expand All @@ -1374,17 +1350,13 @@ export class Firestore {
* takes a request and GAX options.
* @param request The Protobuf request to send.
* @param requestTag A unique client-assigned identifier for this request.
* @param allowRetries Whether this is an idempotent request that can be
* retried.
* @returns A Promise with the resulting read-only stream.
*/
readStream(
methodName: string,
request: {},
requestTag: string,
allowRetries: boolean
requestTag: string
): Promise<NodeJS.ReadableStream> {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

const result = new Deferred<NodeJS.ReadableStream>();
Expand All @@ -1395,7 +1367,7 @@ export class Firestore {
// stream.
const lifetime = new Deferred<void>();

this._retry(attempts, requestTag, async () => {
this._retry(methodName, requestTag, async () => {
logger(
'Firestore.readStream',
requestTag,
Expand Down Expand Up @@ -1444,17 +1416,13 @@ export class Firestore {
* GAX options.
* @param request The Protobuf request to send as the first stream message.
* @param requestTag A unique client-assigned identifier for this request.
* @param allowRetries Whether this is an idempotent request that can be
* retried.
* @returns A Promise with the resulting read/write stream.
*/
readWriteStream(
methodName: string,
request: {},
requestTag: string,
allowRetries: boolean
requestTag: string
): Promise<NodeJS.ReadWriteStream> {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

const result = new Deferred<NodeJS.ReadWriteStream>();
Expand All @@ -1465,7 +1433,7 @@ export class Firestore {
// stream.
const lifetime = new Deferred<void>();

this._retry(attempts, requestTag, async () => {
this._retry(methodName, requestTag, async () => {
logger('Firestore.readWriteStream', requestTag, 'Opening stream');
const requestStream = gapicClient[methodName](callOptions);

Expand Down
16 changes: 3 additions & 13 deletions dev/src/reference.ts
Expand Up @@ -294,12 +294,7 @@ export class DocumentReference implements Serializable {
pageSize: Math.pow(2, 16) - 1,
};
return this._firestore
.request<string[]>(
'listCollectionIds',
request,
tag,
/* allowRetries= */ true
)
.request<string[]>('listCollectionIds', request, tag)
.then(collectionIds => {
const collections: CollectionReference[] = [];

Expand Down Expand Up @@ -1833,7 +1828,7 @@ export class Query {
this.firestore.initializeIfNeeded(tag).then(() => {
const request = this.toProto(transactionId);
this._firestore
.readStream('runQuery', request, tag, true)
.readStream('runQuery', request, tag)
.then(backendStream => {
backendStream.on('error', err => {
logger(
Expand Down Expand Up @@ -2064,12 +2059,7 @@ export class CollectionReference extends Query {
};

return this.firestore
.request<api.IDocument[]>(
'listDocuments',
request,
tag,
/*allowRetries=*/ true
)
.request<api.IDocument[]>('listDocuments', request, tag)
.then(documents => {
// Note that the backend already orders these documents by name,
// so we do not need to manually sort them.
Expand Down
15 changes: 2 additions & 13 deletions dev/src/transaction.ts
Expand Up @@ -50,11 +50,6 @@ import api = proto.google.firestore.v1;
const READ_AFTER_WRITE_ERROR_MSG =
'Firestore transactions require all reads to be executed before all writes.';

/*!
* Transactions can be retried if the initial stream opening errors out.
*/
const ALLOW_RETRIES = true;

/**
* A reference to a transaction.
*
Expand Down Expand Up @@ -372,8 +367,7 @@ export class Transaction {
.request<api.IBeginTransactionResponse>(
'beginTransaction',
request,
this._requestTag,
ALLOW_RETRIES
this._requestTag
)
.then(resp => {
this._transactionId = resp.transaction!;
Expand Down Expand Up @@ -405,12 +399,7 @@ export class Transaction {
transaction: this._transactionId,
};

return this._firestore.request(
'rollback',
request,
this._requestTag,
/* allowRetries= */ false
);
return this._firestore.request('rollback', request, this._requestTag);
}

/**
Expand Down