diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index 1258bfe0e8..42fe9b7cb7 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -17,15 +17,19 @@ package com.google.cloud.spanner; import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; +import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionForCancellation; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.api.client.util.BackOff; +import com.google.api.client.util.ExponentialBackOff; +import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.ByteArray; import com.google.cloud.Date; import com.google.cloud.Timestamp; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.AbstractIterator; @@ -46,6 +50,7 @@ import io.opencensus.trace.Span; import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; +import java.io.IOException; import java.io.Serializable; import java.util.AbstractList; import java.util.ArrayList; @@ -55,7 +60,10 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -820,8 +828,10 @@ void setCall(SpannerRpc.StreamingCall call) { @VisibleForTesting abstract static class ResumableStreamIterator extends AbstractIterator implements CloseableIterator { + private static final RetrySettings STREAMING_RETRY_SETTINGS = + SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings(); private static final Logger logger = Logger.getLogger(ResumableStreamIterator.class.getName()); - private final BackOff backOff = SpannerImpl.newBackOff(); + private final BackOff backOff = newBackOff(); private final LinkedList buffer = new LinkedList<>(); private final int maxBufferSize; private final Span span; @@ -841,6 +851,70 @@ protected ResumableStreamIterator(int maxBufferSize, String streamName, Span par this.span = tracer.spanBuilderWithExplicitParent(streamName, parent).startSpan(); } + private static ExponentialBackOff newBackOff() { + return new ExponentialBackOff.Builder() + .setMultiplier(STREAMING_RETRY_SETTINGS.getRetryDelayMultiplier()) + .setInitialIntervalMillis( + (int) STREAMING_RETRY_SETTINGS.getInitialRetryDelay().toMillis()) + .setMaxIntervalMillis((int) STREAMING_RETRY_SETTINGS.getMaxRetryDelay().toMillis()) + .setMaxElapsedTimeMillis(Integer.MAX_VALUE) // Prevent Backoff.STOP from getting returned. + .build(); + } + + private static void backoffSleep(Context context, BackOff backoff) throws SpannerException { + backoffSleep(context, nextBackOffMillis(backoff)); + } + + private static long nextBackOffMillis(BackOff backoff) throws SpannerException { + try { + return backoff.nextBackOffMillis(); + } catch (IOException e) { + throw newSpannerException(ErrorCode.INTERNAL, e.getMessage(), e); + } + } + + private static void backoffSleep(Context context, long backoffMillis) throws SpannerException { + tracer + .getCurrentSpan() + .addAnnotation( + "Backing off", + ImmutableMap.of("Delay", AttributeValue.longAttributeValue(backoffMillis))); + final CountDownLatch latch = new CountDownLatch(1); + final Context.CancellationListener listener = + new Context.CancellationListener() { + @Override + public void cancelled(Context context) { + // Wakeup on cancellation / DEADLINE_EXCEEDED. + latch.countDown(); + } + }; + + context.addListener(listener, DirectExecutor.INSTANCE); + try { + if (backoffMillis == BackOff.STOP) { + // Highly unlikely but we handle it just in case. + backoffMillis = STREAMING_RETRY_SETTINGS.getMaxRetryDelay().toMillis(); + } + if (latch.await(backoffMillis, TimeUnit.MILLISECONDS)) { + // Woken by context cancellation. + throw newSpannerExceptionForCancellation(context, null); + } + } catch (InterruptedException interruptExcept) { + throw newSpannerExceptionForCancellation(context, interruptExcept); + } finally { + context.removeListener(listener); + } + } + + private enum DirectExecutor implements Executor { + INSTANCE; + + @Override + public void execute(Runnable command) { + command.run(); + } + } + abstract CloseableIterator startStream(@Nullable ByteString resumeToken); @Override @@ -915,9 +989,9 @@ protected PartialResultSet computeNext() { try (Scope s = tracer.withSpan(span)) { long delay = e.getRetryDelayInMillis(); if (delay != -1) { - SpannerImpl.backoffSleep(context, delay); + backoffSleep(context, delay); } else { - SpannerImpl.backoffSleep(context, backOff); + backoffSleep(context, backOff); } } continue; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 22f3911b07..aa8abbacb4 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -16,11 +16,6 @@ package com.google.cloud.spanner; -import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; -import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionForCancellation; - -import com.google.api.client.util.BackOff; -import com.google.api.client.util.ExponentialBackOff; import com.google.api.gax.paging.Page; import com.google.cloud.BaseService; import com.google.cloud.PageImpl; @@ -32,22 +27,15 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.grpc.Context; -import io.opencensus.trace.AttributeValue; import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -55,9 +43,6 @@ /** Default implementation of the Cloud Spanner interface. */ class SpannerImpl extends BaseService implements Spanner { - private static final int MIN_BACKOFF_MS = 1000; - private static final int MAX_BACKOFF_MS = 32000; - private static final Logger logger = Logger.getLogger(SpannerImpl.class.getName()); static final Tracer tracer = Tracing.getTracer(); @@ -101,59 +86,6 @@ class SpannerImpl extends BaseService implements Spanner { this(options.getSpannerRpcV1(), options); } - static ExponentialBackOff newBackOff() { - return new ExponentialBackOff.Builder() - .setInitialIntervalMillis(MIN_BACKOFF_MS) - .setMaxIntervalMillis(MAX_BACKOFF_MS) - .setMaxElapsedTimeMillis(Integer.MAX_VALUE) // Prevent Backoff.STOP from getting returned. - .build(); - } - - static void backoffSleep(Context context, BackOff backoff) throws SpannerException { - backoffSleep(context, nextBackOffMillis(backoff)); - } - - static long nextBackOffMillis(BackOff backoff) throws SpannerException { - try { - return backoff.nextBackOffMillis(); - } catch (IOException e) { - throw newSpannerException(ErrorCode.INTERNAL, e.getMessage(), e); - } - } - - static void backoffSleep(Context context, long backoffMillis) throws SpannerException { - tracer - .getCurrentSpan() - .addAnnotation( - "Backing off", - ImmutableMap.of("Delay", AttributeValue.longAttributeValue(backoffMillis))); - final CountDownLatch latch = new CountDownLatch(1); - final Context.CancellationListener listener = - new Context.CancellationListener() { - @Override - public void cancelled(Context context) { - // Wakeup on cancellation / DEADLINE_EXCEEDED. - latch.countDown(); - } - }; - - context.addListener(listener, DirectExecutor.INSTANCE); - try { - if (backoffMillis == BackOff.STOP) { - // Highly unlikely but we handle it just in case. - backoffMillis = MAX_BACKOFF_MS; - } - if (latch.await(backoffMillis, TimeUnit.MILLISECONDS)) { - // Woken by context cancellation. - throw newSpannerExceptionForCancellation(context, null); - } - } catch (InterruptedException interruptExcept) { - throw newSpannerExceptionForCancellation(context, interruptExcept); - } finally { - context.removeListener(listener); - } - } - /** Returns the {@link SpannerRpc} of this {@link SpannerImpl} instance. */ SpannerRpc getRpc() { return gapicRpc; @@ -286,13 +218,4 @@ void setNextPageToken(String nextPageToken) { abstract S fromProto(T proto); } - - private enum DirectExecutor implements Executor { - INSTANCE; - - @Override - public void execute(Runnable command) { - command.run(); - } - } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 0ea97ca29b..aaf4267df4 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import com.google.api.client.util.BackOff; import com.google.cloud.Timestamp; import com.google.cloud.spanner.SessionImpl.SessionTransaction; import com.google.cloud.spanner.spi.v1.SpannerRpc; @@ -153,17 +152,6 @@ boolean isAborted() { } } - /** Return the delay in milliseconds between requests to Cloud Spanner. */ - long getRetryDelayInMillis(BackOff backoff) { - long delay = SpannerImpl.nextBackOffMillis(backoff); - synchronized (lock) { - if (retryDelayInMillis >= 0) { - return retryDelayInMillis; - } - } - return delay; - } - void rollback() { // We're exiting early due to a user exception, but the transaction is still active. // Send a rollback for the transaction to release any locks held.