Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
feat: support retry settings and retryable codes in call context (#1238)
Browse files Browse the repository at this point in the history
* feat: support retry settings and retryable codes in call context

Allows applications to supply retry settings and retryabe codes for individual RPCs
through ApiCallContext.

Fixes #1197

* chore: fix formatting and license headers

* feat: use context retry settings for streaming calls

* fix: process review comments

* fix: missed one Thread.sleep

* fix: fix style

* fix: address review comments

* refactor: remove ContextAwareResultRetryAlgorithm

* refactor: remove ContextAwareTimedRetryAlgorithm

* fix: make package-private + add ignored

* revert: revert to always using global settings for jittered

* test: add tests for NoopRetryingContext

* chore: cleanup 'this' usage

* refactor: merge context aware classes with base classes

* chore: cleanup and remove unnecessary methods

* test: add safety margin as the retry settings are now jittered

* docs: add javadoc

* chore: address review comments

* fix: address review comments

* fix: add tests + fix potential NPE

* fix: remove unused method + add tests

* test: add additional test calls

* fix: deprecation
  • Loading branch information
olavloite committed Mar 18, 2021
1 parent 86d5c72 commit 7f7aa25
Show file tree
Hide file tree
Showing 30 changed files with 2,093 additions and 254 deletions.
180 changes: 137 additions & 43 deletions gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallContext.java
Expand Up @@ -30,14 +30,17 @@
package com.google.api.gax.grpc;

import com.google.api.core.BetaApi;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.TransportChannel;
import com.google.api.gax.rpc.internal.Headers;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.NoopApiTracer;
import com.google.auth.Credentials;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.CallOptions.Key;
Expand All @@ -48,6 +51,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
Expand All @@ -70,18 +74,36 @@ public final class GrpcCallContext implements ApiCallContext {
@Nullable private final Duration streamWaitTimeout;
@Nullable private final Duration streamIdleTimeout;
@Nullable private final Integer channelAffinity;
@Nullable private final RetrySettings retrySettings;
@Nullable private final ImmutableSet<StatusCode.Code> retryableCodes;
private final ImmutableMap<String, List<String>> extraHeaders;

/** Returns an empty instance with a null channel and default {@link CallOptions}. */
public static GrpcCallContext createDefault() {
return new GrpcCallContext(
null, CallOptions.DEFAULT, null, null, null, null, ImmutableMap.<String, List<String>>of());
null,
CallOptions.DEFAULT,
null,
null,
null,
null,
ImmutableMap.<String, List<String>>of(),
null,
null);
}

/** Returns an instance with the given channel and {@link CallOptions}. */
public static GrpcCallContext of(Channel channel, CallOptions callOptions) {
return new GrpcCallContext(
channel, callOptions, null, null, null, null, ImmutableMap.<String, List<String>>of());
channel,
callOptions,
null,
null,
null,
null,
ImmutableMap.<String, List<String>>of(),
null,
null);
}

private GrpcCallContext(
Expand All @@ -91,14 +113,18 @@ private GrpcCallContext(
@Nullable Duration streamWaitTimeout,
@Nullable Duration streamIdleTimeout,
@Nullable Integer channelAffinity,
ImmutableMap<String, List<String>> extraHeaders) {
ImmutableMap<String, List<String>> extraHeaders,
@Nullable RetrySettings retrySettings,
@Nullable Set<StatusCode.Code> retryableCodes) {
this.channel = channel;
this.callOptions = Preconditions.checkNotNull(callOptions);
this.timeout = timeout;
this.streamWaitTimeout = streamWaitTimeout;
this.streamIdleTimeout = streamIdleTimeout;
this.channelAffinity = channelAffinity;
this.extraHeaders = Preconditions.checkNotNull(extraHeaders);
this.retrySettings = retrySettings;
this.retryableCodes = retryableCodes == null ? null : ImmutableSet.copyOf(retryableCodes);
}

/**
Expand Down Expand Up @@ -160,7 +186,9 @@ public GrpcCallContext withTimeout(@Nullable Duration timeout) {
this.streamWaitTimeout,
this.streamIdleTimeout,
this.channelAffinity,
this.extraHeaders);
this.extraHeaders,
this.retrySettings,
this.retryableCodes);
}

@Nullable
Expand All @@ -177,13 +205,15 @@ public GrpcCallContext withStreamWaitTimeout(@Nullable Duration streamWaitTimeou
}

return new GrpcCallContext(
channel,
callOptions,
timeout,
this.channel,
this.callOptions,
this.timeout,
streamWaitTimeout,
streamIdleTimeout,
channelAffinity,
extraHeaders);
this.streamIdleTimeout,
this.channelAffinity,
this.extraHeaders,
this.retrySettings,
this.retryableCodes);
}

@Override
Expand All @@ -194,25 +224,29 @@ public GrpcCallContext withStreamIdleTimeout(@Nullable Duration streamIdleTimeou
}

return new GrpcCallContext(
channel,
callOptions,
timeout,
streamWaitTimeout,
this.channel,
this.callOptions,
this.timeout,
this.streamWaitTimeout,
streamIdleTimeout,
channelAffinity,
extraHeaders);
this.channelAffinity,
this.extraHeaders,
this.retrySettings,
this.retryableCodes);
}

@BetaApi("The surface for channel affinity is not stable yet and may change in the future.")
public GrpcCallContext withChannelAffinity(@Nullable Integer affinity) {
return new GrpcCallContext(
channel,
callOptions,
timeout,
streamWaitTimeout,
streamIdleTimeout,
this.channel,
this.callOptions,
this.timeout,
this.streamWaitTimeout,
this.streamIdleTimeout,
affinity,
extraHeaders);
this.extraHeaders,
this.retrySettings,
this.retryableCodes);
}

@BetaApi("The surface for extra headers is not stable yet and may change in the future.")
Expand All @@ -222,13 +256,53 @@ public GrpcCallContext withExtraHeaders(Map<String, List<String>> extraHeaders)
ImmutableMap<String, List<String>> newExtraHeaders =
Headers.mergeHeaders(this.extraHeaders, extraHeaders);
return new GrpcCallContext(
channel,
callOptions,
timeout,
streamWaitTimeout,
streamIdleTimeout,
channelAffinity,
newExtraHeaders);
this.channel,
this.callOptions,
this.timeout,
this.streamWaitTimeout,
this.streamIdleTimeout,
this.channelAffinity,
newExtraHeaders,
this.retrySettings,
this.retryableCodes);
}

@Override
public RetrySettings getRetrySettings() {
return this.retrySettings;
}

@Override
public GrpcCallContext withRetrySettings(RetrySettings retrySettings) {
return new GrpcCallContext(
this.channel,
this.callOptions,
this.timeout,
this.streamWaitTimeout,
this.streamIdleTimeout,
this.channelAffinity,
this.extraHeaders,
retrySettings,
this.retryableCodes);
}

@Override
public Set<StatusCode.Code> getRetryableCodes() {
return this.retryableCodes;
}

@Override
public GrpcCallContext withRetryableCodes(Set<StatusCode.Code> retryableCodes) {
return new GrpcCallContext(
this.channel,
this.callOptions,
this.timeout,
this.streamWaitTimeout,
this.streamIdleTimeout,
this.channelAffinity,
this.extraHeaders,
this.retrySettings,
retryableCodes);
}

@Override
Expand Down Expand Up @@ -283,8 +357,18 @@ public ApiCallContext merge(ApiCallContext inputCallContext) {
newChannelAffinity = this.channelAffinity;
}

RetrySettings newRetrySettings = grpcCallContext.retrySettings;
if (newRetrySettings == null) {
newRetrySettings = this.retrySettings;
}

Set<StatusCode.Code> newRetryableCodes = grpcCallContext.retryableCodes;
if (newRetryableCodes == null) {
newRetryableCodes = this.retryableCodes;
}

ImmutableMap<String, List<String>> newExtraHeaders =
Headers.mergeHeaders(extraHeaders, grpcCallContext.extraHeaders);
Headers.mergeHeaders(this.extraHeaders, grpcCallContext.extraHeaders);

CallOptions newCallOptions =
grpcCallContext
Expand All @@ -303,7 +387,9 @@ public ApiCallContext merge(ApiCallContext inputCallContext) {
newStreamWaitTimeout,
newStreamIdleTimeout,
newChannelAffinity,
newExtraHeaders);
newExtraHeaders,
newRetrySettings,
newRetryableCodes);
}

/** The {@link Channel} set on this context. */
Expand Down Expand Up @@ -357,23 +443,27 @@ public GrpcCallContext withChannel(Channel newChannel) {
return new GrpcCallContext(
newChannel,
this.callOptions,
timeout,
this.timeout,
this.streamWaitTimeout,
this.streamIdleTimeout,
this.channelAffinity,
this.extraHeaders);
this.extraHeaders,
this.retrySettings,
this.retryableCodes);
}

/** Returns a new instance with the call options set to the given call options. */
public GrpcCallContext withCallOptions(CallOptions newCallOptions) {
return new GrpcCallContext(
this.channel,
newCallOptions,
timeout,
this.timeout,
this.streamWaitTimeout,
this.streamIdleTimeout,
this.channelAffinity,
this.extraHeaders);
this.extraHeaders,
this.retrySettings,
this.retryableCodes);
}

public GrpcCallContext withRequestParamsDynamicHeaderOption(String requestParams) {
Expand Down Expand Up @@ -410,7 +500,9 @@ public int hashCode() {
streamWaitTimeout,
streamIdleTimeout,
channelAffinity,
extraHeaders);
extraHeaders,
retrySettings,
retryableCodes);
}

@Override
Expand All @@ -423,13 +515,15 @@ public boolean equals(Object o) {
}

GrpcCallContext that = (GrpcCallContext) o;
return Objects.equals(channel, that.channel)
&& Objects.equals(callOptions, that.callOptions)
&& Objects.equals(timeout, that.timeout)
&& Objects.equals(streamWaitTimeout, that.streamWaitTimeout)
&& Objects.equals(streamIdleTimeout, that.streamIdleTimeout)
&& Objects.equals(channelAffinity, that.channelAffinity)
&& Objects.equals(extraHeaders, that.extraHeaders);
return Objects.equals(this.channel, that.channel)
&& Objects.equals(this.callOptions, that.callOptions)
&& Objects.equals(this.timeout, that.timeout)
&& Objects.equals(this.streamWaitTimeout, that.streamWaitTimeout)
&& Objects.equals(this.streamIdleTimeout, that.streamIdleTimeout)
&& Objects.equals(this.channelAffinity, that.channelAffinity)
&& Objects.equals(this.extraHeaders, that.extraHeaders)
&& Objects.equals(this.retrySettings, that.retrySettings)
&& Objects.equals(this.retryableCodes, that.retryableCodes);
}

Metadata getMetadata() {
Expand Down

0 comments on commit 7f7aa25

Please sign in to comment.