Navigation Menu

Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
feat: wrap non-retryable RPCs in retry machinery (#1328)
Browse files Browse the repository at this point in the history
* feat: wrap non-retryable RPCs in retry machinery

* address feedback

* add contextual retry settings to timeout tests

* add tracing test for non-retryable callable

* add server streaming callable tests
  • Loading branch information
noahdietz committed Apr 5, 2021
1 parent 751ccf3 commit 51c40ab
Show file tree
Hide file tree
Showing 4 changed files with 395 additions and 35 deletions.
130 changes: 110 additions & 20 deletions gax-grpc/src/test/java/com/google/api/gax/grpc/TimeoutTest.java
Expand Up @@ -74,9 +74,12 @@ public class TimeoutTest {
private static final int DEADLINE_IN_MINUTES = 10;
private static final int DEADLINE_IN_SECONDS = 20;
private static final ImmutableSet<StatusCode.Code> emptyRetryCodes = ImmutableSet.of();
private static final ImmutableSet<StatusCode.Code> retryUnknownCode =
ImmutableSet.of(StatusCode.Code.UNKNOWN);
private static final Duration totalTimeout = Duration.ofDays(DEADLINE_IN_DAYS);
private static final Duration maxRpcTimeout = Duration.ofMinutes(DEADLINE_IN_MINUTES);
private static final Duration initialRpcTimeout = Duration.ofSeconds(DEADLINE_IN_SECONDS);
private static final GrpcCallContext defaultCallContext = GrpcCallContext.createDefault();

@Rule public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
@Mock private Marshaller<String> stringMarshaller;
Expand All @@ -97,7 +100,8 @@ public void testNonRetryUnarySettings() {
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(maxRpcTimeout)
.build();
CallOptions callOptionsUsed = setupUnaryCallable(retrySettings);
CallOptions callOptionsUsed =
setupUnaryCallable(retrySettings, emptyRetryCodes, defaultCallContext);

// Verify that the gRPC channel used the CallOptions with our custom timeout of ~2 Days.
assertThat(callOptionsUsed.getDeadline()).isNotNull();
Expand All @@ -108,6 +112,46 @@ public void testNonRetryUnarySettings() {
assertThat(callOptionsUsed.getAuthority()).isEqualTo(CALL_OPTIONS_AUTHORITY);
}

@Test
public void testNonRetryUnarySettingsContextWithRetry() {
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setTotalTimeout(totalTimeout)
.setInitialRetryDelay(Duration.ZERO)
.setRetryDelayMultiplier(1.0)
.setMaxRetryDelay(Duration.ZERO)
.setMaxAttempts(1)
.setJittered(true)
.setInitialRpcTimeout(initialRpcTimeout)
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(maxRpcTimeout)
.build();
Duration newTimeout = Duration.ofSeconds(5);
RetrySettings contextRetrySettings =
retrySettings
.toBuilder()
.setInitialRpcTimeout(newTimeout)
.setMaxRpcTimeout(newTimeout)
.setMaxAttempts(3)
.build();
GrpcCallContext retryingContext =
defaultCallContext
.withRetrySettings(contextRetrySettings)
.withRetryableCodes(retryUnknownCode);
CallOptions callOptionsUsed =
setupUnaryCallable(retrySettings, emptyRetryCodes, retryingContext);

// Verify that the gRPC channel used the CallOptions the initial timeout of ~5 seconds.
// This indicates that the context retry settings were used on a callable that was instantiated
// with non-retryable settings.
assertThat(callOptionsUsed.getDeadline()).isNotNull();
assertThat(callOptionsUsed.getDeadline())
.isGreaterThan(Deadline.after(newTimeout.toSecondsPart() - 1, TimeUnit.SECONDS));
assertThat(callOptionsUsed.getDeadline())
.isLessThan(Deadline.after(newTimeout.toSecondsPart(), TimeUnit.SECONDS));
assertThat(callOptionsUsed.getAuthority()).isEqualTo(CALL_OPTIONS_AUTHORITY);
}

@Test
public void testNonRetryUnarySettingsWithoutInitialRpcTimeout() {
RetrySettings retrySettings =
Expand All @@ -121,7 +165,8 @@ public void testNonRetryUnarySettingsWithoutInitialRpcTimeout() {
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(maxRpcTimeout)
.build();
CallOptions callOptionsUsed = setupUnaryCallable(retrySettings);
CallOptions callOptionsUsed =
setupUnaryCallable(retrySettings, emptyRetryCodes, defaultCallContext);

// Verify that the gRPC channel used the CallOptions with our custom timeout of ~2 Days.
assertThat(callOptionsUsed.getDeadline()).isNotNull();
Expand All @@ -145,7 +190,8 @@ public void testNonRetryUnarySettingsWithoutIndividualRpcTimeout() {
.setRpcTimeoutMultiplier(1.0)
.setRpcTimeoutMultiplier(1.0)
.build();
CallOptions callOptionsUsed = setupUnaryCallable(retrySettings);
CallOptions callOptionsUsed =
setupUnaryCallable(retrySettings, emptyRetryCodes, defaultCallContext);

// Verify that the gRPC channel used the CallOptions with our custom timeout of ~2 Days.
assertThat(callOptionsUsed.getDeadline()).isNotNull();
Expand All @@ -170,7 +216,8 @@ public void testNonRetryServerStreamingSettings() {
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(maxRpcTimeout)
.build();
CallOptions callOptionsUsed = setupServerStreamingCallable(retrySettings);
CallOptions callOptionsUsed =
setupServerStreamingCallable(retrySettings, emptyRetryCodes, defaultCallContext);

// Verify that the gRPC channel used the CallOptions with our custom timeout of ~2 Days.
assertThat(callOptionsUsed.getDeadline()).isNotNull();
Expand All @@ -181,6 +228,41 @@ public void testNonRetryServerStreamingSettings() {
assertThat(callOptionsUsed.getAuthority()).isEqualTo(CALL_OPTIONS_AUTHORITY);
}

@Test
public void testNonRetryServerStreamingSettingsContextWithRetry() {
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setTotalTimeout(totalTimeout)
.setInitialRetryDelay(Duration.ZERO)
.setRetryDelayMultiplier(1.0)
.setMaxRetryDelay(Duration.ZERO)
.setMaxAttempts(1)
.setJittered(true)
.setInitialRpcTimeout(initialRpcTimeout)
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(maxRpcTimeout)
.build();
Duration newTimeout = Duration.ofSeconds(5);
RetrySettings contextRetrySettings =
retrySettings.toBuilder().setTotalTimeout(newTimeout).setMaxAttempts(3).build();
GrpcCallContext retryingContext =
defaultCallContext
.withRetrySettings(contextRetrySettings)
.withRetryableCodes(retryUnknownCode);
CallOptions callOptionsUsed =
setupServerStreamingCallable(retrySettings, emptyRetryCodes, retryingContext);

// Verify that the gRPC channel used the CallOptions the total timeout of ~5 seconds.
// This indicates that the context retry settings were used on a callable that was instantiated
// with non-retryable settings.
assertThat(callOptionsUsed.getDeadline()).isNotNull();
assertThat(callOptionsUsed.getDeadline())
.isGreaterThan(Deadline.after(newTimeout.toSecondsPart() - 1, TimeUnit.SECONDS));
assertThat(callOptionsUsed.getDeadline())
.isLessThan(Deadline.after(newTimeout.toSecondsPart(), TimeUnit.SECONDS));
assertThat(callOptionsUsed.getAuthority()).isEqualTo(CALL_OPTIONS_AUTHORITY);
}

@Test
public void testNonRetryServerStreamingSettingsWithoutInitialRpcTimeout() {
RetrySettings retrySettings =
Expand All @@ -194,7 +276,8 @@ public void testNonRetryServerStreamingSettingsWithoutInitialRpcTimeout() {
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(maxRpcTimeout)
.build();
CallOptions callOptionsUsed = setupServerStreamingCallable(retrySettings);
CallOptions callOptionsUsed =
setupServerStreamingCallable(retrySettings, emptyRetryCodes, defaultCallContext);

// Verify that the gRPC channel used the CallOptions with our custom timeout of ~2 Days.
assertThat(callOptionsUsed.getDeadline()).isNotNull();
Expand All @@ -218,7 +301,8 @@ public void testNonRetryServerStreamingSettingsWithoutIndividualRpcTimeout() {
.setRpcTimeoutMultiplier(1.0)
.setRpcTimeoutMultiplier(1.0)
.build();
CallOptions callOptionsUsed = setupServerStreamingCallable(retrySettings);
CallOptions callOptionsUsed =
setupServerStreamingCallable(retrySettings, emptyRetryCodes, defaultCallContext);

// Verify that the gRPC channel used the CallOptions with our custom timeout of ~2 Days.
assertThat(callOptionsUsed.getDeadline()).isNotNull();
Expand All @@ -229,7 +313,10 @@ public void testNonRetryServerStreamingSettingsWithoutIndividualRpcTimeout() {
assertThat(callOptionsUsed.getAuthority()).isEqualTo(CALL_OPTIONS_AUTHORITY);
}

private CallOptions setupUnaryCallable(RetrySettings retrySettings) {
private CallOptions setupUnaryCallable(
RetrySettings retrySettings,
ImmutableSet<StatusCode.Code> retryableCodes,
GrpcCallContext callContext) {
MethodDescriptor<String, String> methodDescriptor =
MethodDescriptor.<String, String>newBuilder()
.setSchemaDescriptor("yaml")
Expand All @@ -248,8 +335,8 @@ private CallOptions setupUnaryCallable(RetrySettings retrySettings) {
// Clobber the "authority" property with an identifier that allows us to trace
// the use of this CallOptions variable.
CallOptions spyCallOptions = CallOptions.DEFAULT.withAuthority("RETRYING_TEST");
GrpcCallContext grpcCallContext =
GrpcCallContext.createDefault().withChannel(managedChannel).withCallOptions(spyCallOptions);
GrpcCallContext context =
callContext.withChannel(managedChannel).withCallOptions(spyCallOptions);

ArgumentCaptor<CallOptions> callOptionsArgumentCaptor =
ArgumentCaptor.forClass(CallOptions.class);
Expand All @@ -266,16 +353,16 @@ private CallOptions setupUnaryCallable(RetrySettings retrySettings) {
.setMethodDescriptor(methodDescriptor)
.setParamsExtractor(paramsExtractor)
.build();
UnaryCallSettings<String, String> nonRetriedCallSettings =
UnaryCallSettings<String, String> unaryCallSettings =
UnaryCallSettings.<String, String>newUnaryCallSettingsBuilder()
.setRetrySettings(retrySettings)
.setRetryableCodes(emptyRetryCodes)
.setRetryableCodes(retryableCodes)
.build();
UnaryCallable<String, String> callable =
GrpcCallableFactory.createUnaryCallable(
grpcCallSettings,
nonRetriedCallSettings,
ClientContext.newBuilder().setDefaultCallContext(grpcCallContext).build());
unaryCallSettings,
ClientContext.newBuilder().setDefaultCallContext(context).build());

try {
ApiFuture<String> future = callable.futureCall("Is your refrigerator running?");
Expand All @@ -287,7 +374,10 @@ private CallOptions setupUnaryCallable(RetrySettings retrySettings) {
return callOptionsArgumentCaptor.getValue();
}

private CallOptions setupServerStreamingCallable(RetrySettings retrySettings) {
private CallOptions setupServerStreamingCallable(
RetrySettings retrySettings,
ImmutableSet<StatusCode.Code> retryableCodes,
GrpcCallContext callContext) {
MethodDescriptor<String, String> methodDescriptor =
MethodDescriptor.<String, String>newBuilder()
.setSchemaDescriptor("yaml")
Expand All @@ -306,8 +396,8 @@ private CallOptions setupServerStreamingCallable(RetrySettings retrySettings) {
// Clobber the "authority" property with an identifier that allows us to trace
// the use of this CallOptions variable.
CallOptions spyCallOptions = CallOptions.DEFAULT.withAuthority("RETRYING_TEST");
GrpcCallContext grpcCallContext =
GrpcCallContext.createDefault().withChannel(managedChannel).withCallOptions(spyCallOptions);
GrpcCallContext context =
callContext.withChannel(managedChannel).withCallOptions(spyCallOptions);

ArgumentCaptor<CallOptions> callOptionsArgumentCaptor =
ArgumentCaptor.forClass(CallOptions.class);
Expand All @@ -324,16 +414,16 @@ private CallOptions setupServerStreamingCallable(RetrySettings retrySettings) {
.setMethodDescriptor(methodDescriptor)
.setParamsExtractor(paramsExtractor)
.build();
ServerStreamingCallSettings<String, String> nonRetriedCallSettings =
ServerStreamingCallSettings<String, String> serverStreamingCallSettings =
ServerStreamingCallSettings.<String, String>newBuilder()
.setRetrySettings(retrySettings)
.setRetryableCodes(emptyRetryCodes)
.setRetryableCodes(retryableCodes)
.build();
ServerStreamingCallable<String, String> callable =
GrpcCallableFactory.createServerStreamingCallable(
grpcCallSettings,
nonRetriedCallSettings,
ClientContext.newBuilder().setDefaultCallContext(grpcCallContext).build());
serverStreamingCallSettings,
ClientContext.newBuilder().setDefaultCallContext(context).build());

try {
ServerStream<String> stream = callable.call("Is your refrigerator running?");
Expand Down
33 changes: 18 additions & 15 deletions gax/src/main/java/com/google/api/gax/rpc/Callables.java
Expand Up @@ -56,19 +56,21 @@ public static <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> retrying(
UnaryCallSettings<?, ?> callSettings,
ClientContext clientContext) {

if (areRetriesDisabled(callSettings.getRetryableCodes(), callSettings.getRetrySettings())) {
UnaryCallSettings<?, ?> settings = callSettings;

if (areRetriesDisabled(settings.getRetryableCodes(), settings.getRetrySettings())) {
// When retries are disabled, the total timeout can be treated as the rpc timeout.
return innerCallable.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withTimeout(callSettings.getRetrySettings().getTotalTimeout()));
settings =
settings
.toBuilder()
.setSimpleTimeoutNoRetries(settings.getRetrySettings().getTotalTimeout())
.build();
}

RetryAlgorithm<ResponseT> retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm<ResponseT>(),
new ExponentialRetryAlgorithm(
callSettings.getRetrySettings(), clientContext.getClock()));
new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock()));
ScheduledRetryingExecutor<ResponseT> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
return new RetryingCallable<>(
Expand All @@ -81,25 +83,26 @@ public static <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT>
ServerStreamingCallSettings<RequestT, ResponseT> callSettings,
ClientContext clientContext) {

if (areRetriesDisabled(callSettings.getRetryableCodes(), callSettings.getRetrySettings())) {
ServerStreamingCallSettings<RequestT, ResponseT> settings = callSettings;
if (areRetriesDisabled(settings.getRetryableCodes(), settings.getRetrySettings())) {
// When retries are disabled, the total timeout can be treated as the rpc timeout.
return innerCallable.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withTimeout(callSettings.getRetrySettings().getTotalTimeout()));
settings =
settings
.toBuilder()
.setSimpleTimeoutNoRetries(settings.getRetrySettings().getTotalTimeout())
.build();
}

StreamingRetryAlgorithm<Void> retryAlgorithm =
new StreamingRetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new ExponentialRetryAlgorithm(
callSettings.getRetrySettings(), clientContext.getClock()));
new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock()));

ScheduledRetryingExecutor<Void> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

return new RetryingServerStreamingCallable<>(
innerCallable, retryingExecutor, callSettings.getResumptionStrategy());
innerCallable, retryingExecutor, settings.getResumptionStrategy());
}

@BetaApi("The surface for streaming is not stable yet and may change in the future.")
Expand Down

0 comments on commit 51c40ab

Please sign in to comment.