Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
feat: use context retry settings for streaming calls
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Nov 11, 2020
1 parent 83c2f16 commit aabd3b0
Show file tree
Hide file tree
Showing 8 changed files with 522 additions and 12 deletions.
@@ -0,0 +1,110 @@
/*
* Copyright 2018 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.retrying;

import com.google.api.core.InternalApi;
import java.util.concurrent.CancellationException;

/**
* The streaming retry algorithm, which makes decision based either on the thrown exception and the
* execution time settings of the previous attempt. This extends {@link RetryAlgorithm} to take
* additional information (provided by {@code ServerStreamingAttemptCallable}) into account.
*
* <p>This class is thread-safe.
*
* <p>Internal use only - public for technical reasons.
*/
@InternalApi("For internal use only")
public final class ContextAwareStreamingRetryAlgorithm<ResponseT>
extends ContextAwareRetryAlgorithm<ResponseT> {
public ContextAwareStreamingRetryAlgorithm(
ContextAwareResultRetryAlgorithm<ResponseT> resultAlgorithm,
ContextAwareTimedRetryAlgorithm timedAlgorithm) {
super(resultAlgorithm, timedAlgorithm);
}

/**
* {@inheritDoc}
*
* <p>The attempt settings will be reset if the stream attempt produced any messages.
*/
public TimedAttemptSettings createNextAttempt(
RetryingContext context,
Throwable prevThrowable,
ResponseT prevResponse,
TimedAttemptSettings prevSettings) {

if (prevThrowable instanceof ServerStreamingAttemptException) {
ServerStreamingAttemptException attemptException =
(ServerStreamingAttemptException) prevThrowable;
prevThrowable = prevThrowable.getCause();

// If we have made progress in the last attempt, then reset the delays
if (attemptException.hasSeenResponses()) {
prevSettings =
createFirstAttempt(context)
.toBuilder()
.setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos())
.setOverallAttemptCount(prevSettings.getOverallAttemptCount())
.build();
}
}

return super.createNextAttempt(context, prevThrowable, prevResponse, prevSettings);
}

/**
* {@inheritDoc}
*
* <p>Ensures retries are only scheduled if the {@link StreamResumptionStrategy} in the {@code
* ServerStreamingAttemptCallable} supports it.
*/
@Override
public boolean shouldRetry(
RetryingContext context,
Throwable prevThrowable,
ResponseT prevResponse,
TimedAttemptSettings nextAttemptSettings)
throws CancellationException {

// Unwrap
if (prevThrowable instanceof ServerStreamingAttemptException) {
ServerStreamingAttemptException attemptException =
(ServerStreamingAttemptException) prevThrowable;
prevThrowable = prevThrowable.getCause();

if (!attemptException.canResume()) {
return false;
}
}

return super.shouldRetry(context, prevThrowable, prevResponse, nextAttemptSettings);
}
}
28 changes: 26 additions & 2 deletions gax/src/main/java/com/google/api/gax/rpc/ApiCallContext.java
Expand Up @@ -170,8 +170,27 @@ public interface ApiCallContext extends RetryingContext {
* Code#DEADLINE_EXCEEDED} as one of its retryable codes (or without calling {@link
* #withRetryableCodes(Set)} with a set that includes at least {@link Code#DEADLINE_EXCEEDED})
* will effectively only set a simple timeout that is equal to {@link
* RetrySettings#getInitialRpcTimeout()}. It is better to use {@link #withTimeout(Duration)} if
* that is the intended behavior.
* RetrySettings#getInitialRpcTimeout()}. It is recommended to use {@link #withTimeout(Duration)}
* if that is the intended behavior.
*
* <p>Example usage:
*
* <pre>{@code
* ApiCallContext context = GrpcCallContext.createDefault()
* .withRetrySettings(RetrySettings.newBuilder()
* .setInitialRetryDelay(Duration.ofMillis(10L))
* .setInitialRpcTimeout(Duration.ofMillis(100L))
* .setMaxAttempts(10)
* .setMaxRetryDelay(Duration.ofSeconds(10L))
* .setMaxRpcTimeout(Duration.ofSeconds(30L))
* .setRetryDelayMultiplier(1.4)
* .setRpcTimeoutMultiplier(1.5)
* .setTotalTimeout(Duration.ofMinutes(10L))
* .build())
* .withRetryableCodes(Sets.newSet(
* StatusCode.Code.UNAVAILABLE,
* StatusCode.Code.DEADLINE_EXCEEDED));
* }</pre>
*/
ApiCallContext withRetrySettings(RetrySettings retrySettings);

Expand All @@ -185,6 +204,11 @@ public interface ApiCallContext extends RetryingContext {
* <p>This sets the retryable codes to use for the RPC. These settings will work in combination
* with either the default {@link RetrySettings} for the RPC, or the {@link RetrySettings}
* supplied through {@link #withRetrySettings(RetrySettings)}.
*
* <p>Setting a non-empty set of retryable codes for an RPC that is not already retryable, will
* not have any effect and the RPC will NOT be retried. This option can only be used to change
* which codes are considered retryable for an RPC that already has at least one retryable code in
* its default settings.
*/
ApiCallContext withRetryableCodes(Set<StatusCode.Code> retryableCodes);

Expand Down
Expand Up @@ -35,11 +35,18 @@
/* Package-private for internal use. */
class ApiResultRetryAlgorithm<ResponseT> extends BasicResultRetryAlgorithm<ResponseT> {

/** Returns true if prevThrowable is an {@link ApiException} that is retryable. */
@Override
public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
return (prevThrowable instanceof ApiException) && ((ApiException) prevThrowable).isRetryable();
}

/**
* If {@link RetryingContext#getRetryableCodes()} is not null: Returns true if the status code of
* prevThrowable is in the list of retryable code of the {@link RetryingContext}.
*
* <p>Otherwise it returns the result of {@link #shouldRetry(Throwable, Object)}.
*/
@Override
public boolean shouldRetry(
RetryingContext context, Throwable prevThrowable, ResponseT prevResponse) {
Expand Down
10 changes: 5 additions & 5 deletions gax/src/main/java/com/google/api/gax/rpc/Callables.java
Expand Up @@ -35,11 +35,11 @@
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.retrying.ContextAwareRetryAlgorithm;
import com.google.api.gax.retrying.ContextAwareScheduledRetryingExecutor;
import com.google.api.gax.retrying.ContextAwareStreamingRetryAlgorithm;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.StreamingRetryAlgorithm;
import java.util.Collection;

/**
Expand Down Expand Up @@ -91,14 +91,14 @@ public static <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT>
.withTimeout(callSettings.getRetrySettings().getTotalTimeout()));
}

StreamingRetryAlgorithm<Void> retryAlgorithm =
new StreamingRetryAlgorithm<>(
ContextAwareStreamingRetryAlgorithm<Void> retryAlgorithm =
new ContextAwareStreamingRetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new ExponentialRetryAlgorithm(
callSettings.getRetrySettings(), clientContext.getClock()));

ScheduledRetryingExecutor<Void> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
ContextAwareScheduledRetryingExecutor<Void> retryingExecutor =
new ContextAwareScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

return new RetryingServerStreamingCallable<>(
innerCallable, retryingExecutor, callSettings.getResumptionStrategy());
Expand Down

0 comments on commit aabd3b0

Please sign in to comment.