Skip to content

Commit

Permalink
feat: add exponential backoff to stream retries (#34)
Browse files Browse the repository at this point in the history
* feat: add exponential backoff to stream retries

* Update RetryingConnectionImpl.java

* fix requested changes
  • Loading branch information
hannahrogers-google committed May 14, 2020
1 parent ba4f5b8 commit bdb0995
Showing 1 changed file with 22 additions and 2 deletions.
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsublite.internal.wire;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

import com.google.api.core.AbstractApiService;
Expand All @@ -27,7 +28,11 @@
import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.concurrent.GuardedBy;
import org.threeten.bp.Duration;

/**
* A connection which recreates an underlying stream on retryable errors.
Expand All @@ -43,16 +48,23 @@ class RetryingConnectionImpl<
implements RetryingConnection<ConnectionT>, StreamObserver<ClientResponseT> {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private static final Duration INITIAL_RECONNECT_BACKOFF_TIME = Duration.ofMillis(10);
private static final Duration MAX_RECONNECT_BACKOFF_TIME = Duration.ofSeconds(10);

private final StreamFactory<StreamRequestT, StreamResponseT> streamFactory;
private final SingleConnectionFactory<
StreamRequestT, StreamResponseT, ClientResponseT, ConnectionT>
connectionFactory;
private final StreamRequestT initialRequest;
private final RetryingConnectionObserver<ClientResponseT> observer;
private final ScheduledExecutorService systemExecutor;

// connectionMonitor will not be held in any upcalls.
private final CloseableMonitor connectionMonitor = new CloseableMonitor();

@GuardedBy("connectionMonitor.monitor")
private long nextRetryBackoffDuration = INITIAL_RECONNECT_BACKOFF_TIME.toMillis();

@GuardedBy("connectionMonitor.monitor")
private ConnectionT currentConnection;

Expand All @@ -69,6 +81,7 @@ class RetryingConnectionImpl<
this.connectionFactory = connectionFactory;
this.initialRequest = initialRequest;
this.observer = observer;
this.systemExecutor = Executors.newSingleThreadScheduledExecutor();
}

@Override
Expand All @@ -95,6 +108,7 @@ protected void doStop() {
notifyFailed(e);
return;
}
systemExecutor.shutdownNow();
notifyStopped();
}

Expand Down Expand Up @@ -124,6 +138,7 @@ public final void onNext(ClientResponseT value) {
Status status;
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
if (completed) return;
nextRetryBackoffDuration = INITIAL_RECONNECT_BACKOFF_TIME.toMillis();
}
status = observer.onClientResponse(value);
if (!status.isOk()) {
Expand All @@ -144,8 +159,11 @@ public final void onError(Throwable t) {
return;
}
Optional<Throwable> throwable = Optional.empty();
long backoffTime = 0;
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
currentConnection.close();
backoffTime = nextRetryBackoffDuration;
nextRetryBackoffDuration = Math.min(backoffTime * 2, MAX_RECONNECT_BACKOFF_TIME.toMillis());
} catch (Exception e) {
throwable = Optional.of(e);
}
Expand All @@ -157,8 +175,10 @@ public final void onError(Throwable t) {
.asRuntimeException());
return;
}
logger.atInfo().atMostEvery(30, SECONDS).log("Stream disconnected, attempting retry");
observer.triggerReinitialize();
logger.atInfo().withCause(t).atMostEvery(30, SECONDS).log(
"Stream disconnected attempting retry, after %s milliseconds", backoffTime);
ScheduledFuture<?> retry =
systemExecutor.schedule(observer::triggerReinitialize, backoffTime, MILLISECONDS);
}

@Override
Expand Down

0 comments on commit bdb0995

Please sign in to comment.