Skip to content

Commit

Permalink
feat: use error codes for transaction retries
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Mar 5, 2020
1 parent 7dc0b23 commit 5c77e0c
Show file tree
Hide file tree
Showing 3 changed files with 321 additions and 149 deletions.
61 changes: 44 additions & 17 deletions dev/src/transaction.ts
Expand Up @@ -18,6 +18,7 @@ import {GoogleError, Status} from 'google-gax';

import * as proto from '../protos/firestore_v1_proto_api';

import {ExponentialBackoff} from './backoff';
import {DocumentSnapshot, Precondition} from './document';
import {Firestore, WriteBatch} from './index';
import {logger} from './logger';
Expand Down Expand Up @@ -64,6 +65,7 @@ const READ_AFTER_WRITE_ERROR_MSG =
export class Transaction {
private _firestore: Firestore;
private _writeBatch: WriteBatch;
private _backoff: ExponentialBackoff;
private _requestTag: string;
private _transactionId?: Uint8Array;

Expand All @@ -78,6 +80,7 @@ export class Transaction {
this._firestore = firestore;
this._writeBatch = firestore.batch();
this._requestTag = requestTag;
this._backoff = new ExponentialBackoff();
}

/**
Expand Down Expand Up @@ -407,7 +410,7 @@ export class Transaction {
maxAttempts: number
): Promise<T> {
let result: T;
let lastError: Error | undefined = undefined;
let lastError: GoogleError | undefined = undefined;

for (let attempt = 0; attempt < maxAttempts; ++attempt) {
if (lastError) {
Expand All @@ -419,6 +422,9 @@ export class Transaction {
);
}

this._writeBatch._reset();
await this.maybeBackoff(lastError);

await this.begin();

try {
Expand All @@ -429,6 +435,8 @@ export class Transaction {
);
}
result = await promise;
await this.commit();
return result;
} catch (err) {
logger(
'Firestore.runTransaction',
Expand All @@ -441,19 +449,10 @@ export class Transaction {

if (isRetryableTransactionError(err)) {
lastError = err;
continue; // Retry full transaction
} else {
return Promise.reject(err); // Callback failed w/ non-retryable error
}
}

try {
await this.commit();
return result; // Success
} catch (err) {
lastError = err;
this._writeBatch._reset();
}
}

logger(
Expand All @@ -464,6 +463,25 @@ export class Transaction {
);
return Promise.reject(lastError);
}

/**
* Delays further operations based on the provided error.
*
* @private
* @return A Promise that resolves after the delay expired.
*/
private async maybeBackoff(error?: GoogleError) {
if (error) {
if (error.code === Status.RESOURCE_EXHAUSTED) {
this._backoff.resetToMax();
} else if (error.code === Status.ABORTED) {
// We don't backoff for ABORTED to avoid starvation
this._backoff.reset();
}
}

await this._backoff.backoffAndWait();
}
}

/**
Expand Down Expand Up @@ -562,13 +580,22 @@ function validateReadOptions(
}
}

function isRetryableTransactionError(error: Error): boolean {
if (error instanceof GoogleError || 'code' in error) {
// In transactions, the backend returns code ABORTED for reads that fail
// with contention. These errors should be retried for both GoogleError
// and GoogleError-alike errors (in case the prototype hierarchy gets
// stripped somewhere).
return error.code === Status.ABORTED;
function isRetryableTransactionError(error: GoogleError): boolean {
if (error.code !== undefined) {
// This list is based on https://github.com/firebase/firebase-js-sdk/blob/master/packages/firestore/src/core/transaction_runner.ts#L112
switch (error.code) {
case Status.ABORTED:
case Status.CANCELLED:
case Status.UNKNOWN:
case Status.DEADLINE_EXCEEDED:
case Status.INTERNAL:
case Status.UNAVAILABLE:
case Status.UNAUTHENTICATED:
case Status.RESOURCE_EXHAUSTED:
return true;
default:
return false;
}
}
return false;
}
73 changes: 56 additions & 17 deletions dev/system-test/firestore.ts
Expand Up @@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import {expect} from 'chai';
import {expect, use} from 'chai';
import * as chaiAsPromised from 'chai-as-promised';

import {
CollectionReference,
DocumentData,
DocumentReference,
DocumentSnapshot,
FieldPath,
FieldValue,
Expand All @@ -33,6 +33,8 @@ import {
import {autoId, Deferred} from '../src/util';
import {Post, postConverter, verifyInstance} from '../test/util/helpers';

use(chaiAsPromised);

const version = require('../../package.json').version;

class DeferredPromise<T> {
Expand Down Expand Up @@ -1991,21 +1993,6 @@ describe('Transaction class', () => {
});
});

it('enforces that updated document exists', () => {
const ref = firestore.collection('col').doc();
return firestore
.runTransaction(updateFunction => {
updateFunction.update(ref, {foo: 'b'});
return Promise.resolve();
})
.then(() => {
expect.fail();
})
.catch(err => {
expect(err.message).to.match(/No document to update/);
});
});

it('has delete() method', () => {
let success = false;
const ref = randomCol.doc('doc');
Expand All @@ -2026,6 +2013,58 @@ describe('Transaction class', () => {
expect(result.exists).to.be.false;
});
});

it('does not retry transaction that fail with FAILED_PRECONDITION', async () => {
const ref = firestore.collection('col').doc();

let attempts = 0;

await expect(
firestore.runTransaction(async transaction => {
++attempts;
transaction.update(ref, {foo: 'b'});
})
).to.eventually.be.rejectedWith('No document to update');

expect(attempts).to.equal(1);
});

it('retries transactions that fail with contention', async () => {
const ref = randomCol.doc('doc');

let firstTransaction, secondTransaction: Promise<void>;
let firstTransactionAttempts = 0,
secondTransactionAttempts = 0;

// Create two transactions that both read and update the same document.
// `contentionPromise` is used to ensure that both transactions are active
// when we try to commit, which causes the second transaction to fail with
// Code ABORTED and be retried.
const contentionPromise = new Deferred<void>();

firstTransaction = firestore.runTransaction(async transaction => {
++firstTransactionAttempts;
await transaction.get(ref);
await contentionPromise.promise;
transaction.set(ref, {first: true}, {merge: true});
});

secondTransaction = firestore.runTransaction(async transaction => {
++secondTransactionAttempts;
await transaction.get(ref);
contentionPromise.resolve();
transaction.set(ref, {second: true}, {merge: true});
});

await firstTransaction;
await secondTransaction;

expect(firstTransactionAttempts).to.equal(1);
expect(secondTransactionAttempts).to.equal(2);

const finalSnapshot = await ref.get();
expect(finalSnapshot.data()).to.deep.equal({first: true, second: true});
});
});

describe('WriteBatch class', () => {
Expand Down

0 comments on commit 5c77e0c

Please sign in to comment.