Skip to content

Commit

Permalink
fix: respect total request timeout for Query retries (#806)
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Oct 29, 2021
1 parent 6c71649 commit feb1921
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 22 deletions.
Expand Up @@ -16,7 +16,9 @@

package com.google.cloud.firestore;

import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
import com.google.api.core.NanoClock;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.api.gax.rpc.BidiStreamingCallable;
Expand All @@ -42,6 +44,7 @@
import java.util.Random;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* Main implementation of the Firestore client. This is the entry point for all Firestore
Expand Down Expand Up @@ -408,6 +411,16 @@ public FirestoreRpc getClient() {
return firestoreClient;
}

@Override
public Duration getTotalRequestTimeout() {
return firestoreOptions.getRetrySettings().getTotalTimeout();
}

@Override
public ApiClock getClock() {
return NanoClock.getDefaultClock();
}

/** Request funnel for all read/write requests. */
@Override
public <RequestT, ResponseT> ApiFuture<ResponseT> sendRequest(
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.firestore;

import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.core.InternalExtensionOnly;
Expand All @@ -24,6 +25,7 @@
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.firestore.spi.v1.FirestoreRpc;
import org.threeten.bp.Duration;

@InternalApi
@InternalExtensionOnly
Expand All @@ -37,6 +39,10 @@ interface FirestoreRpcContext<FS extends Firestore> {

FirestoreRpc getClient();

Duration getTotalRequestTimeout();

ApiClock getClock();

<RequestT, ResponseT> ApiFuture<ResponseT> sendRequest(
RequestT requestT, UnaryCallable<RequestT, ResponseT> callable);

Expand Down
Expand Up @@ -67,6 +67,7 @@
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* A Query which you can read or listen to. You can also construct refined Query objects by adding
Expand Down Expand Up @@ -1342,6 +1343,7 @@ public void onCompleted() {
responseObserver.onCompleted();
}
},
/* startTimeNanos= */ rpcContext.getClock().nanoTime(),
/* transactionId= */ null,
/* readTime= */ null);
}
Expand Down Expand Up @@ -1478,6 +1480,7 @@ Timestamp getReadTime() {

private void internalStream(
final QuerySnapshotObserver documentObserver,
final long startTimeNanos,
@Nullable final ByteString transactionId,
@Nullable final Timestamp readTime) {
RunQueryRequest.Builder request = RunQueryRequest.newBuilder();
Expand Down Expand Up @@ -1533,30 +1536,20 @@ public void onNext(RunQueryResponse response) {

@Override
public void onError(Throwable throwable) {
// If a non-transactional query failed, attempt to restart.
// Transactional queries are retried via the transaction runner.
if (transactionId == null && isRetryableError(throwable)) {
QueryDocumentSnapshot cursor = lastReceivedDocument.get();
if (shouldRetry(cursor, throwable)) {
Tracing.getTracer()
.getCurrentSpan()
.addAnnotation("Firestore.Query: Retryable Error");

// Restart the query but use the last document we received as the
// query cursor. Note that this it is ok to not use backoff here
// since we are requiring at least a single document result.
QueryDocumentSnapshot cursor = lastReceivedDocument.get();
if (cursor != null) {
if (options.getRequireConsistency()) {
Query.this
.startAfter(cursor)
.internalStream(
documentObserver, /* transactionId= */ null, cursor.getReadTime());
} else {
Query.this
.startAfter(cursor)
.internalStream(
documentObserver, /* transactionId= */ null, /* readTime= */ null);
}
}
Query.this
.startAfter(cursor)
.internalStream(
documentObserver,
startTimeNanos,
/* transactionId= */ null,
options.getRequireConsistency() ? cursor.getReadTime() : null);

} else {
Tracing.getTracer().getCurrentSpan().addAnnotation("Firestore.Query: Error");
documentObserver.onError(throwable);
Expand All @@ -1573,6 +1566,30 @@ public void onCompleted() {
"numDocuments", AttributeValue.longAttributeValue(numDocuments)));
documentObserver.onCompleted(readTime);
}

boolean shouldRetry(DocumentSnapshot lastDocument, Throwable t) {
if (transactionId != null) {
// Transactional queries are retried via the transaction runner.
return false;
}

if (lastDocument == null) {
// Only retry if we have received a single result. Retries for RPCs with initial
// failure are handled by Google Gax, which also implements backoff.
return false;
}

if (!isRetryableError(t)) {
return false;
}

if (rpcContext.getTotalRequestTimeout().isZero()) {
return true;
}

Duration duration = Duration.ofNanos(rpcContext.getClock().nanoTime() - startTimeNanos);
return duration.compareTo(rpcContext.getTotalRequestTimeout()) < 0;
}
};

rpcContext.streamRequest(request.build(), observer, rpcContext.getClient().runQueryCallable());
Expand Down Expand Up @@ -1642,6 +1659,7 @@ public void onCompleted() {
result.set(querySnapshot);
}
},
/* startTimeNanos= */ rpcContext.getClock().nanoTime(),
transactionId,
/* readTime= */ null);

Expand Down
Expand Up @@ -34,10 +34,13 @@
import static com.google.cloud.firestore.LocalFirestoreHelper.unaryFilter;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;

import com.google.api.core.ApiClock;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.cloud.Timestamp;
Expand All @@ -60,6 +63,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -71,10 +75,29 @@
import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.threeten.bp.Duration;

@RunWith(MockitoJUnitRunner.class)
public class QueryTest {

static class MockClock implements ApiClock {
long nanoTime = 0;

public void advance(long nanos) {
nanoTime += nanos;
}

@Override
public long nanoTime() {
return nanoTime;
}

@Override
public long millisTime() {
return nanoTime / 1000000;
}
}

@Spy
private final FirestoreImpl firestoreMock =
new FirestoreImpl(
Expand All @@ -87,8 +110,14 @@ public class QueryTest {

private Query query;

private MockClock clock;

@Before
public void before() {
clock = new MockClock();
doReturn(clock).when(firestoreMock).getClock();
doReturn(Duration.ZERO).when(firestoreMock).getTotalRequestTimeout();

query = firestoreMock.collection(COLLECTION_ID);
}

Expand Down Expand Up @@ -1025,7 +1054,86 @@ public void onCompleted() {}
semaphore.acquire();

// Verify the request count
List<RunQueryRequest> requests = runQuery.getAllValues();
assertEquals(1, runQuery.getAllValues().size());
}

@Test
public void onlyRetriesWhenResultSent() throws Exception {
doAnswer(
queryResponse(
FirestoreException.forServerRejection(
Status.DEADLINE_EXCEEDED, "Simulated test failure")))
.when(firestoreMock)
.streamRequest(
runQuery.capture(),
streamObserverCapture.capture(),
Matchers.<ServerStreamingCallable>any());

assertThrows(ExecutionException.class, () -> query.get().get());

// Verify the request count
assertEquals(1, runQuery.getAllValues().size());
}

@Test
public void retriesWithoutTimeout() throws Exception {
final boolean[] returnError = new boolean[] {true};

doAnswer(
(Answer<RunQueryResponse>)
invocation -> {
// Advance clock by an hour
clock.advance(Duration.ofHours(1).toNanos());

if (returnError[0]) {
returnError[0] = false;
return queryResponse(
FirestoreException.forServerRejection(
Status.DEADLINE_EXCEEDED, "Simulated test failure"),
DOCUMENT_NAME + "1")
.answer(invocation);
} else {
return queryResponse(DOCUMENT_NAME + "2").answer(invocation);
}
})
.when(firestoreMock)
.streamRequest(
runQuery.capture(),
streamObserverCapture.capture(),
Matchers.<ServerStreamingCallable>any());

query.get().get();

// Verify the request count
assertEquals(2, runQuery.getAllValues().size());
}

@Test
public void doesNotRetryWithTimeout() {
doReturn(Duration.ofMinutes(1)).when(firestoreMock).getTotalRequestTimeout();

doAnswer(
(Answer<RunQueryResponse>)
invocation -> {
// Advance clock by an hour
clock.advance(Duration.ofHours(1).toNanos());

return queryResponse(
FirestoreException.forServerRejection(
Status.DEADLINE_EXCEEDED, "Simulated test failure"),
DOCUMENT_NAME + "1",
DOCUMENT_NAME + "2")
.answer(invocation);
})
.when(firestoreMock)
.streamRequest(
runQuery.capture(),
streamObserverCapture.capture(),
Matchers.<ServerStreamingCallable>any());

assertThrows(ExecutionException.class, () -> query.get().get());

// Verify the request count
assertEquals(1, runQuery.getAllValues().size());
}

Expand Down
Expand Up @@ -1769,7 +1769,7 @@ public void testRecursiveDeleteWithCustomBulkWriterInstance() throws Exception {
}

@Test
public void testEnforcesTimeouts() throws Exception {
public void testEnforcesTimeouts() {
FirestoreOptions firestoreOptions =
FirestoreOptions.newBuilder()
.setRetrySettings(
Expand Down

0 comments on commit feb1921

Please sign in to comment.