Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: base transaction retries on error codes #953

Merged
merged 7 commits into from Mar 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 38 additions & 17 deletions dev/src/transaction.ts
Expand Up @@ -18,6 +18,7 @@ import {GoogleError, Status} from 'google-gax';

import * as proto from '../protos/firestore_v1_proto_api';

import {ExponentialBackoff} from './backoff';
import {DocumentSnapshot, Precondition} from './document';
import {Firestore, WriteBatch} from './index';
import {logger} from './logger';
Expand Down Expand Up @@ -64,6 +65,7 @@ const READ_AFTER_WRITE_ERROR_MSG =
export class Transaction {
private _firestore: Firestore;
private _writeBatch: WriteBatch;
private _backoff: ExponentialBackoff;
private _requestTag: string;
private _transactionId?: Uint8Array;

Expand All @@ -78,6 +80,7 @@ export class Transaction {
this._firestore = firestore;
this._writeBatch = firestore.batch();
this._requestTag = requestTag;
this._backoff = new ExponentialBackoff();
}

/**
Expand Down Expand Up @@ -407,7 +410,7 @@ export class Transaction {
maxAttempts: number
): Promise<T> {
let result: T;
let lastError: Error | undefined = undefined;
let lastError: GoogleError | undefined = undefined;

for (let attempt = 0; attempt < maxAttempts; ++attempt) {
if (lastError) {
Expand All @@ -419,6 +422,9 @@ export class Transaction {
);
}

this._writeBatch._reset();
await this.maybeBackoff(lastError);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async/await is amazing. ❤️


await this.begin();

try {
Expand All @@ -429,6 +435,8 @@ export class Transaction {
);
}
result = await promise;
await this.commit();
return result;
} catch (err) {
logger(
'Firestore.runTransaction',
Expand All @@ -441,19 +449,10 @@ export class Transaction {

if (isRetryableTransactionError(err)) {
lastError = err;
continue; // Retry full transaction
} else {
return Promise.reject(err); // Callback failed w/ non-retryable error
}
}

try {
await this.commit();
return result; // Success
} catch (err) {
lastError = err;
this._writeBatch._reset();
}
}

logger(
Expand All @@ -464,6 +463,19 @@ export class Transaction {
);
return Promise.reject(lastError);
}

/**
* Delays further operations based on the provided error.
*
* @private
* @return A Promise that resolves after the delay expired.
*/
private async maybeBackoff(error?: GoogleError) {
if (error && error.code === Status.RESOURCE_EXHAUSTED) {
this._backoff.resetToMax();
}
await this._backoff.backoffAndWait();
}
}

/**
Expand Down Expand Up @@ -562,13 +574,22 @@ function validateReadOptions(
}
}

function isRetryableTransactionError(error: Error): boolean {
if (error instanceof GoogleError || 'code' in error) {
// In transactions, the backend returns code ABORTED for reads that fail
// with contention. These errors should be retried for both GoogleError
// and GoogleError-alike errors (in case the prototype hierarchy gets
// stripped somewhere).
return error.code === Status.ABORTED;
function isRetryableTransactionError(error: GoogleError): boolean {
if (error.code !== undefined) {
// This list is based on https://github.com/firebase/firebase-js-sdk/blob/master/packages/firestore/src/core/transaction_runner.ts#L112
switch (error.code) {
case Status.ABORTED:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth distinguishing retry-the-rpc vs retry-the-whole-transaction errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The per RPC retry is actually driven by GAX. The one downside of this approach is that if say a RunQuery RPC fails with DEADLINE_EXCEEDED, the RPC will retried by GAX first. If GAX retries don't resolve the issue, then we will restart the transaction.

It's probably possible to turn off GAX retries, but it is a pretty invasive change.

case Status.CANCELLED:
case Status.UNKNOWN:
case Status.DEADLINE_EXCEEDED:
case Status.INTERNAL:
case Status.UNAVAILABLE:
case Status.UNAUTHENTICATED:
case Status.RESOURCE_EXHAUSTED:
return true;
default:
return false;
}
}
return false;
}
71 changes: 54 additions & 17 deletions dev/system-test/firestore.ts
Expand Up @@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import {expect} from 'chai';
import {expect, use} from 'chai';
import * as chaiAsPromised from 'chai-as-promised';

import {
CollectionReference,
DocumentData,
DocumentReference,
DocumentSnapshot,
FieldPath,
FieldValue,
Expand All @@ -33,6 +33,8 @@ import {
import {autoId, Deferred} from '../src/util';
import {Post, postConverter, verifyInstance} from '../test/util/helpers';

use(chaiAsPromised);

const version = require('../../package.json').version;

class DeferredPromise<T> {
Expand Down Expand Up @@ -1991,21 +1993,6 @@ describe('Transaction class', () => {
});
});

it('enforces that updated document exists', () => {
const ref = firestore.collection('col').doc();
return firestore
.runTransaction(updateFunction => {
updateFunction.update(ref, {foo: 'b'});
return Promise.resolve();
})
.then(() => {
expect.fail();
})
.catch(err => {
expect(err.message).to.match(/No document to update/);
});
});

it('has delete() method', () => {
let success = false;
const ref = randomCol.doc('doc');
Expand All @@ -2026,6 +2013,56 @@ describe('Transaction class', () => {
expect(result.exists).to.be.false;
});
});

it('does not retry transaction that fail with FAILED_PRECONDITION', async () => {
const ref = firestore.collection('col').doc();

let attempts = 0;

await expect(
firestore.runTransaction(async transaction => {
++attempts;
transaction.update(ref, {foo: 'b'});
})
).to.eventually.be.rejectedWith('No document to update');

expect(attempts).to.equal(1);
});

it('retries transactions that fail with contention', async () => {
const ref = randomCol.doc('doc');

let firstTransaction, secondTransaction: Promise<void>;
let attempts = 0;

// Create two transactions that both read and update the same document.
// `contentionPromise` is used to ensure that both transactions are active
// on commit, which causes one of transactions to fail with Code ABORTED
// and be retried.
const contentionPromise = new Deferred<void>();

firstTransaction = firestore.runTransaction(async transaction => {
++attempts;
await transaction.get(ref);
await contentionPromise.promise;
transaction.set(ref, {first: true}, {merge: true});
});

secondTransaction = firestore.runTransaction(async transaction => {
++attempts;
await transaction.get(ref);
contentionPromise.resolve();
transaction.set(ref, {second: true}, {merge: true});
});

await firstTransaction;
await secondTransaction;

expect(attempts).to.equal(3);

const finalSnapshot = await ref.get();
expect(finalSnapshot.data()).to.deep.equal({first: true, second: true});
});
});

describe('WriteBatch class', () => {
Expand Down