Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add new Firestore.runAsyncTransaction #103

Merged
merged 1 commit into from Mar 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -99,6 +99,43 @@ <T> ApiFuture<T> runTransaction(
@Nonnull final Transaction.Function<T> updateFunction,
@Nonnull TransactionOptions transactionOptions);

/**
* Executes the given updateFunction and then attempts to commit the changes applied within the
* transaction. If any document read within the transaction has changed, the updateFunction will
* be retried. If it fails to commit after 5 attempts, the transaction will fail. <br>
* <br>
* Running a transaction places locks all consumed documents. To unblock other clients, the
* Firestore backend automatically releases all locks after 60 seconds of inactivity and fails all
* transactions that last longer than 270 seconds (see <a
* href="https://firebase.google.com/docs/firestore/quotas#writes_and_transactions">Firestore
* Quotas</a>).
*
* @param updateFunction The function to execute within the transaction context.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One potential downside of your proposal is that it makes it now much easier to invoke transactions that take a long time to execute (as the user may now be inclined to kick off other tasks as well). Can you add something to the comment to explain these downsides? E.g. something like: "Running a transaction places locks all all consumed documents. To unblock other clients, the Firestore backend automatically releases all locks after 60 seconds of inactivity and fails all transactions that last longer than 270 seconds (see https://firebase.google.com/docs/firestore/quotas#writes_and_transactions)"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment added to the javadoc

* @return An ApiFuture that will be resolved with the result from updateFunction.
*/
@Nonnull
<T> ApiFuture<T> runAsyncTransaction(@Nonnull final Transaction.AsyncFunction<T> updateFunction);

/**
* Executes the given updateFunction and then attempts to commit the changes applied within the
* transaction. If any document read within the transaction has changed, the updateFunction will
* be retried. If it fails to commit after the maxmimum number of attemps specified in
* transactionOptions, the transaction will fail. <br>
* <br>
* Running a transaction places locks all consumed documents. To unblock other clients, the
* Firestore backend automatically releases all locks after 60 seconds of inactivity and fails all
* transactions that last longer than 270 seconds (see <a
* href="https://firebase.google.com/docs/firestore/quotas#writes_and_transactions">Firestore
* Quotas</a>).
*
* @param updateFunction The function to execute within the transaction context.
* @return An ApiFuture that will be resolved with the result from updateFunction.
*/
@Nonnull
<T> ApiFuture<T> runAsyncTransaction(
@Nonnull final Transaction.AsyncFunction<T> updateFunction,
@Nonnull TransactionOptions transactionOptions);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move both these methods below the overload for runTransaction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved

/**
* Retrieves multiple documents from Firestore.
*
Expand Down
Expand Up @@ -299,13 +299,30 @@ public <T> ApiFuture<T> runTransaction(
@Nonnull final Transaction.Function<T> updateFunction,
@Nonnull TransactionOptions transactionOptions) {
SettableApiFuture<T> resultFuture = SettableApiFuture.create();
runTransaction(new TransactionAsyncAdapter<>(updateFunction), resultFuture, transactionOptions);
return resultFuture;
}

@Nonnull
@Override
public <T> ApiFuture<T> runAsyncTransaction(
@Nonnull final Transaction.AsyncFunction<T> updateFunction) {
return runAsyncTransaction(updateFunction, TransactionOptions.create());
}

@Nonnull
@Override
public <T> ApiFuture<T> runAsyncTransaction(
@Nonnull final Transaction.AsyncFunction<T> updateFunction,
@Nonnull TransactionOptions transactionOptions) {
SettableApiFuture<T> resultFuture = SettableApiFuture.create();
runTransaction(updateFunction, resultFuture, transactionOptions);
return resultFuture;
}

/** Transaction functions that returns its result in the provided SettableFuture. */
private <T> void runTransaction(
final Transaction.Function<T> transactionCallback,
final Transaction.AsyncFunction<T> transactionCallback,
final SettableApiFuture<T> resultFuture,
final TransactionOptions options) {
// span is intentionally not ended here. It will be ended by runTransactionAttempt on success
Expand All @@ -317,7 +334,7 @@ private <T> void runTransaction(
}

private <T> void runTransactionAttempt(
final Transaction.Function<T> transactionCallback,
final Transaction.AsyncFunction<T> transactionCallback,
final SettableApiFuture<T> resultFuture,
final TransactionOptions options,
final Span span) {
Expand Down Expand Up @@ -384,7 +401,21 @@ private SettableApiFuture<T> invokeUserCallback() {
@Override
public void run() {
try {
callbackResult.set(transactionCallback.updateCallback(transaction));
ApiFuture<T> updateCallback = transactionCallback.updateCallback(transaction);
ApiFutures.addCallback(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am reading the documentation of addCallback correctly it only runs the ApiFutureCallback on the provided executor. We actually want to run transactionCallback on the user executor. Do you mind fixing that and/or convincing me that I am mistaken?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.

updateCallback,
new ApiFutureCallback<T>() {
@Override
public void onFailure(Throwable t) {
callbackResult.setException(t);
}

@Override
public void onSuccess(T result) {
callbackResult.set(result);
}
},
MoreExecutors.directExecutor());
} catch (Throwable t) {
callbackResult.setException(t);
}
Expand Down Expand Up @@ -494,4 +525,23 @@ public void close() throws Exception {
firestoreClient.close();
closed = true;
}

private static class TransactionAsyncAdapter<T> implements Transaction.AsyncFunction<T> {
private final Transaction.Function<T> syncFunction;

public TransactionAsyncAdapter(Transaction.Function<T> syncFunction) {
this.syncFunction = syncFunction;
}

@Override
public ApiFuture<T> updateCallback(Transaction transaction) {
SettableApiFuture<T> callbackResult = SettableApiFuture.create();
try {
callbackResult.set(syncFunction.updateCallback(transaction));
} catch (Throwable e) {
callbackResult.setException(e);
}
return callbackResult;
}
}
}
Expand Up @@ -42,7 +42,7 @@ public final class Transaction extends UpdateBuilder<Transaction> {
"Firestore transactions require all reads to be executed before all writes";

/**
* User callback that takes a Firestore Transaction
* User callback that takes a Firestore Transaction.
*
* @param <T> The result type of the user callback.
*/
Expand All @@ -51,6 +51,16 @@ public interface Function<T> {
T updateCallback(Transaction transaction) throws Exception;
}

/**
* User callback that takes a Firestore Async Transaction.
*
* @param <T> The result type of the user async callback.
*/
public interface AsyncFunction<T> {

ApiFuture<T> updateCallback(Transaction transaction);
}

private final ByteString previousTransactionId;
private ByteString transactionId;
private boolean pending;
Expand Down
Expand Up @@ -34,6 +34,7 @@
import static com.google.cloud.firestore.LocalFirestoreHelper.set;
import static com.google.cloud.firestore.LocalFirestoreHelper.update;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -132,6 +133,33 @@ public String updateCallback(Transaction transaction) {
assertEquals(commit(TRANSACTION_ID), requests.get(1));
}

@Test
public void returnsValueAsync() throws Exception {
doReturn(beginResponse())
.doReturn(commitResponse(0, 0))
.when(firestoreMock)
.sendRequest(requestCapture.capture(), Matchers.<UnaryCallable<Message, Message>>any());

ApiFuture<String> transaction =
firestoreMock.runAsyncTransaction(
new Transaction.AsyncFunction<String>() {
@Override
public ApiFuture<String> updateCallback(Transaction transaction) {
Assert.assertEquals("user_provided", Thread.currentThread().getName());
return ApiFutures.immediateFuture("foo");
}
},
options);

assertEquals("foo", transaction.get());

List<Message> requests = requestCapture.getAllValues();
assertEquals(2, requests.size());

assertEquals(begin(), requests.get(0));
assertEquals(commit(TRANSACTION_ID), requests.get(1));
}

@Test
public void canReturnNull() throws Exception {
doReturn(beginResponse())
Expand All @@ -154,6 +182,28 @@ public String updateCallback(Transaction transaction) {
assertEquals(null, transaction.get());
}

@Test
public void canReturnNullAsync() throws Exception {
doReturn(beginResponse())
.doReturn(ApiFutures.immediateFailedFuture(new Exception()))
.doReturn(beginResponse(ByteString.copyFromUtf8("foo2")))
.doReturn(commitResponse(0, 0))
.when(firestoreMock)
.sendRequest(requestCapture.capture(), Matchers.<UnaryCallable<Message, Message>>any());

ApiFuture<String> transaction =
firestoreMock.runAsyncTransaction(
new Transaction.AsyncFunction<String>() {
@Override
public ApiFuture<String> updateCallback(Transaction transaction) {
return ApiFutures.immediateFuture(null);
}
},
options);

assertNull(transaction.get());
}

@Test
public void rollbackOnCallbackError() throws Exception {
doReturn(beginResponse())
Expand Down Expand Up @@ -185,6 +235,37 @@ public String updateCallback(Transaction transaction) throws Exception {
assertEquals(rollback(), requests.get(1));
}

@Test
public void rollbackOnCallbackErrorAsync() throws Exception {
doReturn(beginResponse())
.doReturn(rollbackResponse())
.when(firestoreMock)
.sendRequest(requestCapture.capture(), Matchers.<UnaryCallable<Message, Message>>any());

ApiFuture<String> transaction =
firestoreMock.runAsyncTransaction(
new Transaction.AsyncFunction<String>() {
@Override
public ApiFuture<String> updateCallback(Transaction transaction) {
return ApiFutures.immediateFailedFuture(new Exception("Expected exception"));
}
},
options);

try {
transaction.get();
fail();
} catch (Exception e) {
assertTrue(e.getMessage().endsWith("Expected exception"));
}

List<Message> requests = requestCapture.getAllValues();
assertEquals(2, requests.size());

assertEquals(begin(), requests.get(0));
assertEquals(rollback(), requests.get(1));
}

@Test
public void noRollbackOnBeginFailure() throws Exception {
doReturn(ApiFutures.immediateFailedFuture(new Exception("Expected exception")))
Expand Down Expand Up @@ -213,6 +294,34 @@ public String updateCallback(Transaction transaction) {
assertEquals(1, requests.size());
}

@Test
public void noRollbackOnBeginFailureAsync() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You didn't change this code path, so I think we can remove this test.

Copy link
Contributor Author

@kvakos kvakos Mar 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I duplicate all tests in TransactionTest.java that somehow play with runTransaction and implemented them with runAsyncTransaction. Old sync/blocking methods as it is now implemented also are testing async implementation. I can delete all of *async tests. Can I delete all *async tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost all *async tests were deleted except those with other mocked setup

doReturn(ApiFutures.immediateFailedFuture(new Exception("Expected exception")))
.when(firestoreMock)
.sendRequest(requestCapture.capture(), Matchers.<UnaryCallable<Message, Message>>any());

ApiFuture<String> transaction =
firestoreMock.runAsyncTransaction(
new Transaction.AsyncFunction<String>() {
@Override
public ApiFuture<String> updateCallback(Transaction transaction) {
fail();
return null;
}
},
options);

try {
transaction.get();
fail();
} catch (Exception e) {
assertTrue(e.getMessage().endsWith("Expected exception"));
}

List<Message> requests = requestCapture.getAllValues();
assertEquals(1, requests.size());
}

@Test
public void limitsRetriesWithFailure() throws Exception {
doReturn(beginResponse(ByteString.copyFromUtf8("foo1")))
Expand Down Expand Up @@ -343,6 +452,40 @@ public DocumentSnapshot updateCallback(Transaction transaction)
assertEquals(commit(TRANSACTION_ID), requests.get(2));
}

@Test
public void getDocumentAsync() throws Exception {
doReturn(beginResponse())
.doReturn(commitResponse(0, 0))
.when(firestoreMock)
.sendRequest(requestCapture.capture(), Matchers.<UnaryCallable<Message, Message>>any());

doAnswer(getAllResponse(SINGLE_FIELD_PROTO))
.when(firestoreMock)
.streamRequest(
requestCapture.capture(),
streamObserverCapture.capture(),
Matchers.<ServerStreamingCallable<Message, Message>>any());

ApiFuture<DocumentSnapshot> transaction =
firestoreMock.runAsyncTransaction(
new Transaction.AsyncFunction<DocumentSnapshot>() {
@Override
public ApiFuture<DocumentSnapshot> updateCallback(Transaction transaction) {
return transaction.get(documentReference);
}
},
options);

assertEquals("doc", transaction.get().getId());

List<Message> requests = requestCapture.getAllValues();
assertEquals(3, requests.size());

assertEquals(begin(), requests.get(0));
assertEquals(get(TRANSACTION_ID), requests.get(1));
assertEquals(commit(TRANSACTION_ID), requests.get(2));
}

@Test
public void getMultipleDocuments() throws Exception {
final DocumentReference doc1 = firestoreMock.document("coll/doc1");
Expand Down