Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use streaming retry settings for ResumableStreamIterator #49

Merged
merged 2 commits into from Jan 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,15 +17,19 @@
package com.google.cloud.spanner;

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionForCancellation;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.AbstractIterator;
Expand All @@ -46,6 +50,7 @@
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.io.IOException;
import java.io.Serializable;
import java.util.AbstractList;
import java.util.ArrayList;
Expand All @@ -55,7 +60,10 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -820,8 +828,10 @@ void setCall(SpannerRpc.StreamingCall call) {
@VisibleForTesting
abstract static class ResumableStreamIterator extends AbstractIterator<PartialResultSet>
implements CloseableIterator<PartialResultSet> {
private static final RetrySettings STREAMING_RETRY_SETTINGS =
SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings();
private static final Logger logger = Logger.getLogger(ResumableStreamIterator.class.getName());
private final BackOff backOff = SpannerImpl.newBackOff();
private final BackOff backOff = newBackOff();
private final LinkedList<PartialResultSet> buffer = new LinkedList<>();
private final int maxBufferSize;
private final Span span;
Expand All @@ -841,6 +851,70 @@ protected ResumableStreamIterator(int maxBufferSize, String streamName, Span par
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent).startSpan();
}

private static ExponentialBackOff newBackOff() {
return new ExponentialBackOff.Builder()
.setMultiplier(STREAMING_RETRY_SETTINGS.getRetryDelayMultiplier())
.setInitialIntervalMillis(
(int) STREAMING_RETRY_SETTINGS.getInitialRetryDelay().toMillis())
.setMaxIntervalMillis((int) STREAMING_RETRY_SETTINGS.getMaxRetryDelay().toMillis())
.setMaxElapsedTimeMillis(Integer.MAX_VALUE) // Prevent Backoff.STOP from getting returned.
.build();
}

private static void backoffSleep(Context context, BackOff backoff) throws SpannerException {
backoffSleep(context, nextBackOffMillis(backoff));
}

private static long nextBackOffMillis(BackOff backoff) throws SpannerException {
try {
return backoff.nextBackOffMillis();
} catch (IOException e) {
throw newSpannerException(ErrorCode.INTERNAL, e.getMessage(), e);
}
}

private static void backoffSleep(Context context, long backoffMillis) throws SpannerException {
tracer
.getCurrentSpan()
.addAnnotation(
"Backing off",
ImmutableMap.of("Delay", AttributeValue.longAttributeValue(backoffMillis)));
final CountDownLatch latch = new CountDownLatch(1);
final Context.CancellationListener listener =
new Context.CancellationListener() {
@Override
public void cancelled(Context context) {
// Wakeup on cancellation / DEADLINE_EXCEEDED.
latch.countDown();
}
};

context.addListener(listener, DirectExecutor.INSTANCE);
try {
if (backoffMillis == BackOff.STOP) {
// Highly unlikely but we handle it just in case.
backoffMillis = STREAMING_RETRY_SETTINGS.getMaxRetryDelay().toMillis();
}
if (latch.await(backoffMillis, TimeUnit.MILLISECONDS)) {
// Woken by context cancellation.
throw newSpannerExceptionForCancellation(context, null);
}
} catch (InterruptedException interruptExcept) {
throw newSpannerExceptionForCancellation(context, interruptExcept);
} finally {
context.removeListener(listener);
}
}

private enum DirectExecutor implements Executor {
INSTANCE;

@Override
public void execute(Runnable command) {
command.run();
}
}

abstract CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken);

@Override
Expand Down Expand Up @@ -915,9 +989,9 @@ protected PartialResultSet computeNext() {
try (Scope s = tracer.withSpan(span)) {
long delay = e.getRetryDelayInMillis();
if (delay != -1) {
SpannerImpl.backoffSleep(context, delay);
backoffSleep(context, delay);
} else {
SpannerImpl.backoffSleep(context, backOff);
backoffSleep(context, backOff);
}
}
continue;
Expand Down
Expand Up @@ -16,11 +16,6 @@

package com.google.cloud.spanner;

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionForCancellation;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.gax.paging.Page;
import com.google.cloud.BaseService;
import com.google.cloud.PageImpl;
Expand All @@ -32,32 +27,22 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Context;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/** Default implementation of the Cloud Spanner interface. */
class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
private static final int MIN_BACKOFF_MS = 1000;
private static final int MAX_BACKOFF_MS = 32000;

private static final Logger logger = Logger.getLogger(SpannerImpl.class.getName());
static final Tracer tracer = Tracing.getTracer();

Expand Down Expand Up @@ -101,59 +86,6 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
this(options.getSpannerRpcV1(), options);
}

static ExponentialBackOff newBackOff() {
return new ExponentialBackOff.Builder()
.setInitialIntervalMillis(MIN_BACKOFF_MS)
.setMaxIntervalMillis(MAX_BACKOFF_MS)
.setMaxElapsedTimeMillis(Integer.MAX_VALUE) // Prevent Backoff.STOP from getting returned.
.build();
}

static void backoffSleep(Context context, BackOff backoff) throws SpannerException {
backoffSleep(context, nextBackOffMillis(backoff));
}

static long nextBackOffMillis(BackOff backoff) throws SpannerException {
try {
return backoff.nextBackOffMillis();
} catch (IOException e) {
throw newSpannerException(ErrorCode.INTERNAL, e.getMessage(), e);
}
}

static void backoffSleep(Context context, long backoffMillis) throws SpannerException {
tracer
.getCurrentSpan()
.addAnnotation(
"Backing off",
ImmutableMap.of("Delay", AttributeValue.longAttributeValue(backoffMillis)));
final CountDownLatch latch = new CountDownLatch(1);
final Context.CancellationListener listener =
new Context.CancellationListener() {
@Override
public void cancelled(Context context) {
// Wakeup on cancellation / DEADLINE_EXCEEDED.
latch.countDown();
}
};

context.addListener(listener, DirectExecutor.INSTANCE);
try {
if (backoffMillis == BackOff.STOP) {
// Highly unlikely but we handle it just in case.
backoffMillis = MAX_BACKOFF_MS;
}
if (latch.await(backoffMillis, TimeUnit.MILLISECONDS)) {
// Woken by context cancellation.
throw newSpannerExceptionForCancellation(context, null);
}
} catch (InterruptedException interruptExcept) {
throw newSpannerExceptionForCancellation(context, interruptExcept);
} finally {
context.removeListener(listener);
}
}

/** Returns the {@link SpannerRpc} of this {@link SpannerImpl} instance. */
SpannerRpc getRpc() {
return gapicRpc;
Expand Down Expand Up @@ -286,13 +218,4 @@ void setNextPageToken(String nextPageToken) {

abstract S fromProto(T proto);
}

private enum DirectExecutor implements Executor {
INSTANCE;

@Override
public void execute(Runnable command) {
command.run();
}
}
}
Expand Up @@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.google.api.client.util.BackOff;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
Expand Down Expand Up @@ -153,17 +152,6 @@ boolean isAborted() {
}
}

/** Return the delay in milliseconds between requests to Cloud Spanner. */
long getRetryDelayInMillis(BackOff backoff) {
long delay = SpannerImpl.nextBackOffMillis(backoff);
synchronized (lock) {
if (retryDelayInMillis >= 0) {
return retryDelayInMillis;
}
}
return delay;
}

void rollback() {
// We're exiting early due to a user exception, but the transaction is still active.
// Send a rollback for the transaction to release any locks held.
Expand Down