From feb1921b39fc7630aa6549025c5ffe526e080d54 Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Fri, 29 Oct 2021 10:11:23 -0600 Subject: [PATCH] fix: respect total request timeout for Query retries (#806) --- .../google/cloud/firestore/FirestoreImpl.java | 13 +++ .../cloud/firestore/FirestoreRpcContext.java | 6 + .../com/google/cloud/firestore/Query.java | 58 +++++---- .../com/google/cloud/firestore/QueryTest.java | 110 +++++++++++++++++- .../cloud/firestore/it/ITSystemTest.java | 2 +- 5 files changed, 167 insertions(+), 22 deletions(-) diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java index a4ea4f0f2..ba9a2e985 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java @@ -16,7 +16,9 @@ package com.google.cloud.firestore; +import com.google.api.core.ApiClock; import com.google.api.core.ApiFuture; +import com.google.api.core.NanoClock; import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.api.gax.rpc.BidiStreamingCallable; @@ -42,6 +44,7 @@ import java.util.Random; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.threeten.bp.Duration; /** * Main implementation of the Firestore client. This is the entry point for all Firestore @@ -408,6 +411,16 @@ public FirestoreRpc getClient() { return firestoreClient; } + @Override + public Duration getTotalRequestTimeout() { + return firestoreOptions.getRetrySettings().getTotalTimeout(); + } + + @Override + public ApiClock getClock() { + return NanoClock.getDefaultClock(); + } + /** Request funnel for all read/write requests. */ @Override public ApiFuture sendRequest( diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreRpcContext.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreRpcContext.java index 7e49e1a89..981d81fdd 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreRpcContext.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreRpcContext.java @@ -16,6 +16,7 @@ package com.google.cloud.firestore; +import com.google.api.core.ApiClock; import com.google.api.core.ApiFuture; import com.google.api.core.InternalApi; import com.google.api.core.InternalExtensionOnly; @@ -24,6 +25,7 @@ import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.UnaryCallable; import com.google.cloud.firestore.spi.v1.FirestoreRpc; +import org.threeten.bp.Duration; @InternalApi @InternalExtensionOnly @@ -37,6 +39,10 @@ interface FirestoreRpcContext { FirestoreRpc getClient(); + Duration getTotalRequestTimeout(); + + ApiClock getClock(); + ApiFuture sendRequest( RequestT requestT, UnaryCallable callable); diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java index 3cf86a81e..8b5e87693 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java @@ -67,6 +67,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.threeten.bp.Duration; /** * A Query which you can read or listen to. You can also construct refined Query objects by adding @@ -1342,6 +1343,7 @@ public void onCompleted() { responseObserver.onCompleted(); } }, + /* startTimeNanos= */ rpcContext.getClock().nanoTime(), /* transactionId= */ null, /* readTime= */ null); } @@ -1478,6 +1480,7 @@ Timestamp getReadTime() { private void internalStream( final QuerySnapshotObserver documentObserver, + final long startTimeNanos, @Nullable final ByteString transactionId, @Nullable final Timestamp readTime) { RunQueryRequest.Builder request = RunQueryRequest.newBuilder(); @@ -1533,30 +1536,20 @@ public void onNext(RunQueryResponse response) { @Override public void onError(Throwable throwable) { - // If a non-transactional query failed, attempt to restart. - // Transactional queries are retried via the transaction runner. - if (transactionId == null && isRetryableError(throwable)) { + QueryDocumentSnapshot cursor = lastReceivedDocument.get(); + if (shouldRetry(cursor, throwable)) { Tracing.getTracer() .getCurrentSpan() .addAnnotation("Firestore.Query: Retryable Error"); - // Restart the query but use the last document we received as the - // query cursor. Note that this it is ok to not use backoff here - // since we are requiring at least a single document result. - QueryDocumentSnapshot cursor = lastReceivedDocument.get(); - if (cursor != null) { - if (options.getRequireConsistency()) { - Query.this - .startAfter(cursor) - .internalStream( - documentObserver, /* transactionId= */ null, cursor.getReadTime()); - } else { - Query.this - .startAfter(cursor) - .internalStream( - documentObserver, /* transactionId= */ null, /* readTime= */ null); - } - } + Query.this + .startAfter(cursor) + .internalStream( + documentObserver, + startTimeNanos, + /* transactionId= */ null, + options.getRequireConsistency() ? cursor.getReadTime() : null); + } else { Tracing.getTracer().getCurrentSpan().addAnnotation("Firestore.Query: Error"); documentObserver.onError(throwable); @@ -1573,6 +1566,30 @@ public void onCompleted() { "numDocuments", AttributeValue.longAttributeValue(numDocuments))); documentObserver.onCompleted(readTime); } + + boolean shouldRetry(DocumentSnapshot lastDocument, Throwable t) { + if (transactionId != null) { + // Transactional queries are retried via the transaction runner. + return false; + } + + if (lastDocument == null) { + // Only retry if we have received a single result. Retries for RPCs with initial + // failure are handled by Google Gax, which also implements backoff. + return false; + } + + if (!isRetryableError(t)) { + return false; + } + + if (rpcContext.getTotalRequestTimeout().isZero()) { + return true; + } + + Duration duration = Duration.ofNanos(rpcContext.getClock().nanoTime() - startTimeNanos); + return duration.compareTo(rpcContext.getTotalRequestTimeout()) < 0; + } }; rpcContext.streamRequest(request.build(), observer, rpcContext.getClient().runQueryCallable()); @@ -1642,6 +1659,7 @@ public void onCompleted() { result.set(querySnapshot); } }, + /* startTimeNanos= */ rpcContext.getClock().nanoTime(), transactionId, /* readTime= */ null); diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/QueryTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/QueryTest.java index a1c4ff178..23673507f 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/QueryTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/QueryTest.java @@ -34,10 +34,13 @@ import static com.google.cloud.firestore.LocalFirestoreHelper.unaryFilter; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import com.google.api.core.ApiClock; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.cloud.Timestamp; @@ -60,6 +63,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import org.junit.Before; import org.junit.Test; @@ -71,10 +75,29 @@ import org.mockito.Spy; import org.mockito.runners.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import org.threeten.bp.Duration; @RunWith(MockitoJUnitRunner.class) public class QueryTest { + static class MockClock implements ApiClock { + long nanoTime = 0; + + public void advance(long nanos) { + nanoTime += nanos; + } + + @Override + public long nanoTime() { + return nanoTime; + } + + @Override + public long millisTime() { + return nanoTime / 1000000; + } + } + @Spy private final FirestoreImpl firestoreMock = new FirestoreImpl( @@ -87,8 +110,14 @@ public class QueryTest { private Query query; + private MockClock clock; + @Before public void before() { + clock = new MockClock(); + doReturn(clock).when(firestoreMock).getClock(); + doReturn(Duration.ZERO).when(firestoreMock).getTotalRequestTimeout(); + query = firestoreMock.collection(COLLECTION_ID); } @@ -1025,7 +1054,86 @@ public void onCompleted() {} semaphore.acquire(); // Verify the request count - List requests = runQuery.getAllValues(); + assertEquals(1, runQuery.getAllValues().size()); + } + + @Test + public void onlyRetriesWhenResultSent() throws Exception { + doAnswer( + queryResponse( + FirestoreException.forServerRejection( + Status.DEADLINE_EXCEEDED, "Simulated test failure"))) + .when(firestoreMock) + .streamRequest( + runQuery.capture(), + streamObserverCapture.capture(), + Matchers.any()); + + assertThrows(ExecutionException.class, () -> query.get().get()); + + // Verify the request count + assertEquals(1, runQuery.getAllValues().size()); + } + + @Test + public void retriesWithoutTimeout() throws Exception { + final boolean[] returnError = new boolean[] {true}; + + doAnswer( + (Answer) + invocation -> { + // Advance clock by an hour + clock.advance(Duration.ofHours(1).toNanos()); + + if (returnError[0]) { + returnError[0] = false; + return queryResponse( + FirestoreException.forServerRejection( + Status.DEADLINE_EXCEEDED, "Simulated test failure"), + DOCUMENT_NAME + "1") + .answer(invocation); + } else { + return queryResponse(DOCUMENT_NAME + "2").answer(invocation); + } + }) + .when(firestoreMock) + .streamRequest( + runQuery.capture(), + streamObserverCapture.capture(), + Matchers.any()); + + query.get().get(); + + // Verify the request count + assertEquals(2, runQuery.getAllValues().size()); + } + + @Test + public void doesNotRetryWithTimeout() { + doReturn(Duration.ofMinutes(1)).when(firestoreMock).getTotalRequestTimeout(); + + doAnswer( + (Answer) + invocation -> { + // Advance clock by an hour + clock.advance(Duration.ofHours(1).toNanos()); + + return queryResponse( + FirestoreException.forServerRejection( + Status.DEADLINE_EXCEEDED, "Simulated test failure"), + DOCUMENT_NAME + "1", + DOCUMENT_NAME + "2") + .answer(invocation); + }) + .when(firestoreMock) + .streamRequest( + runQuery.capture(), + streamObserverCapture.capture(), + Matchers.any()); + + assertThrows(ExecutionException.class, () -> query.get().get()); + + // Verify the request count assertEquals(1, runQuery.getAllValues().size()); } diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java index 5100644e2..f384d0f0a 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java @@ -1769,7 +1769,7 @@ public void testRecursiveDeleteWithCustomBulkWriterInstance() throws Exception { } @Test - public void testEnforcesTimeouts() throws Exception { + public void testEnforcesTimeouts() { FirestoreOptions firestoreOptions = FirestoreOptions.newBuilder() .setRetrySettings(