Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: respect total request timeout for Query retries #806

Merged
merged 3 commits into from Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -16,7 +16,9 @@

package com.google.cloud.firestore;

import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
import com.google.api.core.NanoClock;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.api.gax.rpc.BidiStreamingCallable;
Expand All @@ -42,6 +44,7 @@
import java.util.Random;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* Main implementation of the Firestore client. This is the entry point for all Firestore
Expand Down Expand Up @@ -408,6 +411,16 @@ public FirestoreRpc getClient() {
return firestoreClient;
}

@Override
public Duration getTotalRequestTimeout() {
return firestoreOptions.getRetrySettings().getTotalTimeout();
}

@Override
public ApiClock getClock() {
return NanoClock.getDefaultClock();
}

/** Request funnel for all read/write requests. */
@Override
public <RequestT, ResponseT> ApiFuture<ResponseT> sendRequest(
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.firestore;

import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.core.InternalExtensionOnly;
Expand All @@ -24,6 +25,7 @@
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.firestore.spi.v1.FirestoreRpc;
import org.threeten.bp.Duration;

@InternalApi
@InternalExtensionOnly
Expand All @@ -37,6 +39,10 @@ interface FirestoreRpcContext<FS extends Firestore> {

FirestoreRpc getClient();

Duration getTotalRequestTimeout();

ApiClock getClock();

<RequestT, ResponseT> ApiFuture<ResponseT> sendRequest(
RequestT requestT, UnaryCallable<RequestT, ResponseT> callable);

Expand Down
Expand Up @@ -67,6 +67,7 @@
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* A Query which you can read or listen to. You can also construct refined Query objects by adding
Expand Down Expand Up @@ -1342,6 +1343,7 @@ public void onCompleted() {
responseObserver.onCompleted();
}
},
/* startTimeNanos= */ rpcContext.getClock().nanoTime(),
/* transactionId= */ null,
/* readTime= */ null);
}
Expand Down Expand Up @@ -1478,6 +1480,7 @@ Timestamp getReadTime() {

private void internalStream(
final QuerySnapshotObserver documentObserver,
final long startTimeNanos,
@Nullable final ByteString transactionId,
@Nullable final Timestamp readTime) {
RunQueryRequest.Builder request = RunQueryRequest.newBuilder();
Expand Down Expand Up @@ -1533,30 +1536,20 @@ public void onNext(RunQueryResponse response) {

@Override
public void onError(Throwable throwable) {
// If a non-transactional query failed, attempt to restart.
// Transactional queries are retried via the transaction runner.
if (transactionId == null && isRetryableError(throwable)) {
QueryDocumentSnapshot cursor = lastReceivedDocument.get();
if (shouldRetry(cursor, throwable)) {
Tracing.getTracer()
.getCurrentSpan()
.addAnnotation("Firestore.Query: Retryable Error");

// Restart the query but use the last document we received as the
// query cursor. Note that this it is ok to not use backoff here
// since we are requiring at least a single document result.
QueryDocumentSnapshot cursor = lastReceivedDocument.get();
if (cursor != null) {
if (options.getRequireConsistency()) {
Query.this
.startAfter(cursor)
.internalStream(
documentObserver, /* transactionId= */ null, cursor.getReadTime());
} else {
Query.this
.startAfter(cursor)
.internalStream(
documentObserver, /* transactionId= */ null, /* readTime= */ null);
}
}
Query.this
.startAfter(cursor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On line 1073 the startAfter() method annotates its argument with @Nonnull; however, it appears that cursor could be null since the null check has been removed from this code here. Am I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. shouldRetry() checks that it won't be null. Maybe this could be clearer by modifying shouldRetry() to take the cursor as an argument? Can you think of a way to make it clearer that cursor is guaranteed to not be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rearranged the code a bit and used "cursor" as an argument. Let me know if this is what you had in mind.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is what I had in mind. Thanks.

.internalStream(
documentObserver,
startTimeNanos,
/* transactionId= */ null,
options.getRequireConsistency() ? cursor.getReadTime() : null);

} else {
Tracing.getTracer().getCurrentSpan().addAnnotation("Firestore.Query: Error");
documentObserver.onError(throwable);
Expand All @@ -1573,6 +1566,30 @@ public void onCompleted() {
"numDocuments", AttributeValue.longAttributeValue(numDocuments)));
documentObserver.onCompleted(readTime);
}

boolean shouldRetry(DocumentSnapshot lastDocument, Throwable t) {
if (transactionId != null) {
// Transactional queries are retried via the transaction runner.
return false;
}

if (lastDocument == null) {
// Only retry if we have received a single result. Retries for RPCs with initial
// failure are handled by Google Gax, which also implements backoff.
return false;
}

if (!isRetryableError(t)) {
return false;
}

if (rpcContext.getTotalRequestTimeout().isZero()) {
return true;
}

Duration duration = Duration.ofNanos(rpcContext.getClock().nanoTime() - startTimeNanos);
return duration.compareTo(rpcContext.getTotalRequestTimeout()) < 0;
}
};

rpcContext.streamRequest(request.build(), observer, rpcContext.getClient().runQueryCallable());
Expand Down Expand Up @@ -1642,6 +1659,7 @@ public void onCompleted() {
result.set(querySnapshot);
}
},
/* startTimeNanos= */ rpcContext.getClock().nanoTime(),
transactionId,
/* readTime= */ null);

Expand Down
Expand Up @@ -34,10 +34,13 @@
import static com.google.cloud.firestore.LocalFirestoreHelper.unaryFilter;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;

import com.google.api.core.ApiClock;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.cloud.Timestamp;
Expand All @@ -60,6 +63,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -71,10 +75,29 @@
import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.threeten.bp.Duration;

@RunWith(MockitoJUnitRunner.class)
public class QueryTest {

static class MockClock implements ApiClock {
long nanoTime = 0;

public void advance(long nanos) {
nanoTime += nanos;
}

@Override
public long nanoTime() {
return nanoTime;
}

@Override
public long millisTime() {
return nanoTime / 1000000;
}
}

@Spy
private final FirestoreImpl firestoreMock =
new FirestoreImpl(
Expand All @@ -87,8 +110,14 @@ public class QueryTest {

private Query query;

private MockClock clock;

@Before
public void before() {
clock = new MockClock();
doReturn(clock).when(firestoreMock).getClock();
doReturn(Duration.ZERO).when(firestoreMock).getTotalRequestTimeout();

query = firestoreMock.collection(COLLECTION_ID);
}

Expand Down Expand Up @@ -1025,7 +1054,86 @@ public void onCompleted() {}
semaphore.acquire();

// Verify the request count
List<RunQueryRequest> requests = runQuery.getAllValues();
assertEquals(1, runQuery.getAllValues().size());
}

@Test
public void onlyRetriesWhenResultSent() throws Exception {
doAnswer(
queryResponse(
FirestoreException.forServerRejection(
Status.DEADLINE_EXCEEDED, "Simulated test failure")))
.when(firestoreMock)
.streamRequest(
runQuery.capture(),
streamObserverCapture.capture(),
Matchers.<ServerStreamingCallable>any());

assertThrows(ExecutionException.class, () -> query.get().get());

// Verify the request count
assertEquals(1, runQuery.getAllValues().size());
}

@Test
public void retriesWithoutTimeout() throws Exception {
final boolean[] returnError = new boolean[] {true};

doAnswer(
(Answer<RunQueryResponse>)
invocation -> {
// Advance clock by an hour
clock.advance(Duration.ofHours(1).toNanos());

if (returnError[0]) {
returnError[0] = false;
return queryResponse(
FirestoreException.forServerRejection(
Status.DEADLINE_EXCEEDED, "Simulated test failure"),
DOCUMENT_NAME + "1")
.answer(invocation);
} else {
return queryResponse(DOCUMENT_NAME + "2").answer(invocation);
}
})
.when(firestoreMock)
.streamRequest(
runQuery.capture(),
streamObserverCapture.capture(),
Matchers.<ServerStreamingCallable>any());

query.get().get();

// Verify the request count
assertEquals(2, runQuery.getAllValues().size());
}

@Test
public void doesNotRetryWithTimeout() {
doReturn(Duration.ofMinutes(1)).when(firestoreMock).getTotalRequestTimeout();

doAnswer(
(Answer<RunQueryResponse>)
invocation -> {
// Advance clock by an hour
clock.advance(Duration.ofHours(1).toNanos());

return queryResponse(
FirestoreException.forServerRejection(
Status.DEADLINE_EXCEEDED, "Simulated test failure"),
DOCUMENT_NAME + "1",
DOCUMENT_NAME + "2")
.answer(invocation);
})
.when(firestoreMock)
.streamRequest(
runQuery.capture(),
streamObserverCapture.capture(),
Matchers.<ServerStreamingCallable>any());

assertThrows(ExecutionException.class, () -> query.get().get());

// Verify the request count
assertEquals(1, runQuery.getAllValues().size());
}

Expand Down
Expand Up @@ -1769,7 +1769,7 @@ public void testRecursiveDeleteWithCustomBulkWriterInstance() throws Exception {
}

@Test
public void testEnforcesTimeouts() throws Exception {
public void testEnforcesTimeouts() {
FirestoreOptions firestoreOptions =
FirestoreOptions.newBuilder()
.setRetrySettings(
Expand Down