Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
fix: stop overriding default grpc executor (#1355)
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Jul 29, 2021
1 parent 25a23db commit b1f8c43
Show file tree
Hide file tree
Showing 11 changed files with 227 additions and 44 deletions.
Expand Up @@ -135,6 +135,11 @@ private InstantiatingGrpcChannelProvider(Builder builder) {
: builder.directPathServiceConfig;
}

/**
* @deprecated If executor is not set, this channel provider will create channels with default
* grpc executor.
*/
@Deprecated
@Override
public boolean needsExecutor() {
return executor == null;
Expand Down Expand Up @@ -212,9 +217,7 @@ public TransportChannelProvider withCredentials(Credentials credentials) {

@Override
public TransportChannel getTransportChannel() throws IOException {
if (needsExecutor()) {
throw new IllegalStateException("getTransportChannel() called when needsExecutor() is true");
} else if (needsHeaders()) {
if (needsHeaders()) {
throw new IllegalStateException("getTransportChannel() called when needsHeaders() is true");
} else if (needsEndpoint()) {
throw new IllegalStateException("getTransportChannel() called when needsEndpoint() is true");
Expand Down
Expand Up @@ -74,6 +74,7 @@ public boolean shouldAutoClose() {
return true;
}

@Deprecated
@Override
public boolean needsExecutor() {
return false;
Expand Down
Expand Up @@ -93,6 +93,11 @@ private InstantiatingHttpJsonChannelProvider(
this.mtlsProvider = mtlsProvider;
}

/**
* @deprecated If executor is not set, this channel provider will create channels with default
* executor defined in {@link ManagedHttpJsonChannel}.
*/
@Deprecated
@Override
public boolean needsExecutor() {
return executor == null;
Expand Down Expand Up @@ -149,9 +154,7 @@ public String getTransportName() {

@Override
public TransportChannel getTransportChannel() throws IOException {
if (needsExecutor()) {
throw new IllegalStateException("getTransportChannel() called when needsExecutor() is true");
} else if (needsHeaders()) {
if (needsHeaders()) {
throw new IllegalStateException("getTransportChannel() called when needsHeaders() is true");
} else {
try {
Expand Down
Expand Up @@ -37,19 +37,23 @@
import com.google.api.core.BetaApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/** Implementation of HttpJsonChannel which can issue http-json calls. */
@BetaApi
public class ManagedHttpJsonChannel implements HttpJsonChannel, BackgroundResource {
private static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance();
private static final ExecutorService DEFAULT_EXECUTOR =
InstantiatingExecutorProvider.newBuilder().build().getExecutor();

private final Executor executor;
private final String endpoint;
Expand Down Expand Up @@ -134,7 +138,9 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted
public void close() {}

public static Builder newBuilder() {
return new Builder().setHeaderEnhancers(new LinkedList<HttpJsonHeaderEnhancer>());
return new Builder()
.setHeaderEnhancers(new LinkedList<HttpJsonHeaderEnhancer>())
.setExecutor(DEFAULT_EXECUTOR);
}

public static class Builder {
Expand All @@ -147,7 +153,7 @@ public static class Builder {
private Builder() {}

public Builder setExecutor(Executor executor) {
this.executor = executor;
this.executor = Preconditions.checkNotNull(executor);
return this;
}

Expand All @@ -167,7 +173,6 @@ public Builder setHttpTransport(HttpTransport httpTransport) {
}

public ManagedHttpJsonChannel build() {
Preconditions.checkNotNull(executor);
Preconditions.checkNotNull(endpoint);
return new ManagedHttpJsonChannel(
executor, endpoint, jsonFactory, headerEnhancers, httpTransport);
Expand Down
28 changes: 19 additions & 9 deletions gax/src/main/java/com/google/api/gax/rpc/ClientContext.java
Expand Up @@ -50,7 +50,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
Expand All @@ -73,6 +72,10 @@ public abstract class ClientContext {
*/
public abstract List<BackgroundResource> getBackgroundResources();

/**
* Gets the executor to use for running scheduled API call logic (such as retries and long-running
* operations).
*/
public abstract ScheduledExecutorService getExecutor();

@Nullable
Expand Down Expand Up @@ -163,8 +166,8 @@ static String getEndpoint(
public static ClientContext create(StubSettings settings) throws IOException {
ApiClock clock = settings.getClock();

ExecutorProvider executorProvider = settings.getExecutorProvider();
final ScheduledExecutorService executor = executorProvider.getExecutor();
ExecutorProvider backgroundExecutorProvider = settings.getBackgroundExecutorProvider();
final ScheduledExecutorService backgroundExecutor = backgroundExecutorProvider.getExecutor();

Credentials credentials = settings.getCredentialsProvider().getCredentials();

Expand All @@ -177,8 +180,11 @@ public static ClientContext create(StubSettings settings) throws IOException {
}

TransportChannelProvider transportChannelProvider = settings.getTransportChannelProvider();
if (transportChannelProvider.needsExecutor()) {
transportChannelProvider = transportChannelProvider.withExecutor((Executor) executor);
// After needsExecutor and StubSettings#setExecutorProvider are deprecated, transport channel
// executor can only be set from TransportChannelProvider#withExecutor directly, and a provider
// will have a default executor if it needs one.
if (transportChannelProvider.needsExecutor() && settings.getExecutorProvider() != null) {
transportChannelProvider = transportChannelProvider.withExecutor(backgroundExecutor);
}
Map<String, String> headers = getHeadersFromSettings(settings);
if (transportChannelProvider.needsHeaders()) {
Expand Down Expand Up @@ -216,7 +222,7 @@ public static ClientContext create(StubSettings settings) throws IOException {
watchdogProvider = watchdogProvider.withClock(clock);
}
if (watchdogProvider.needsExecutor()) {
watchdogProvider = watchdogProvider.withExecutor(executor);
watchdogProvider = watchdogProvider.withExecutor(backgroundExecutor);
}
watchdog = watchdogProvider.getWatchdog();
}
Expand All @@ -226,16 +232,16 @@ public static ClientContext create(StubSettings settings) throws IOException {
if (transportChannelProvider.shouldAutoClose()) {
backgroundResources.add(transportChannel);
}
if (executorProvider.shouldAutoClose()) {
backgroundResources.add(new ExecutorAsBackgroundResource(executor));
if (backgroundExecutorProvider.shouldAutoClose()) {
backgroundResources.add(new ExecutorAsBackgroundResource(backgroundExecutor));
}
if (watchdogProvider != null && watchdogProvider.shouldAutoClose()) {
backgroundResources.add(watchdog);
}

return newBuilder()
.setBackgroundResources(backgroundResources.build())
.setExecutor(executor)
.setExecutor(backgroundExecutor)
.setCredentials(credentials)
.setTransportChannel(transportChannel)
.setHeaders(ImmutableMap.copyOf(settings.getHeaderProvider().getHeaders()))
Expand Down Expand Up @@ -289,6 +295,10 @@ public abstract static class Builder {

public abstract Builder setBackgroundResources(List<BackgroundResource> backgroundResources);

/**
* Sets the executor to use for running scheduled API call logic (such as retries and
* long-running operations).
*/
public abstract Builder setExecutor(ScheduledExecutorService value);

public abstract Builder setCredentials(Credentials value);
Expand Down
47 changes: 46 additions & 1 deletion gax/src/main/java/com/google/api/gax/rpc/ClientSettings.java
Expand Up @@ -36,6 +36,7 @@
import com.google.api.gax.core.ExecutorProvider;
import com.google.common.base.MoreObjects;
import java.io.IOException;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
Expand Down Expand Up @@ -63,10 +64,16 @@ public final StubSettings getStubSettings() {
return stubSettings;
}

/** @deprecated Please use {@link #getBackgroundExecutorProvider()} */
@Deprecated
public final ExecutorProvider getExecutorProvider() {
return stubSettings.getExecutorProvider();
}

public final ExecutorProvider getBackgroundExecutorProvider() {
return stubSettings.getBackgroundExecutorProvider();
}

public final TransportChannelProvider getTransportChannelProvider() {
return stubSettings.getTransportChannelProvider();
}
Expand Down Expand Up @@ -112,6 +119,7 @@ public final Duration getWatchdogCheckInterval() {
public String toString() {
return MoreObjects.toStringHelper(this)
.add("executorProvider", getExecutorProvider())
.add("backgroundExecutorProvider", getBackgroundExecutorProvider())
.add("transportChannelProvider", getTransportChannelProvider())
.add("credentialsProvider", getCredentialsProvider())
.add("headerProvider", getHeaderProvider())
Expand Down Expand Up @@ -159,9 +167,27 @@ protected StubSettings.Builder getStubSettings() {
* call logic (such as retries and long-running operations), and also to pass to the transport
* settings if an executor is needed for the transport and it doesn't have its own executor
* provider.
*
* @deprecated Please use {@link #setBackgroundExecutorProvider(ExecutorProvider)} for setting
* executor to use for running scheduled API call logic. To set executor for {@link
* TransportChannelProvider}, please use {@link
* TransportChannelProvider#withExecutor(Executor)} instead.
*/
@Deprecated
public B setExecutorProvider(ExecutorProvider executorProvider) {
stubSettings.setExecutorProvider(executorProvider);
stubSettings.setBackgroundExecutorProvider(executorProvider);
return self();
}

/**
* Sets the ExecutorProvider to use for getting the executor to use for running scheduled API
* call logic (such as retries and long-running operations). This will not set the executor in
* {@link TransportChannelProvider}. To set executor for {@link TransportChannelProvider},
* please use {@link TransportChannelProvider#withExecutor(Executor)}.
*/
public B setBackgroundExecutorProvider(ExecutorProvider executorProvider) {
stubSettings.setBackgroundExecutorProvider(executorProvider);
return self();
}

Expand Down Expand Up @@ -238,11 +264,29 @@ public B setWatchdogCheckInterval(@Nullable Duration checkInterval) {
return self();
}

/** Gets the ExecutorProvider that was previously set on this Builder. */
/**
* Gets the ExecutorProvider that was previously set on this Builder. This ExecutorProvider is
* to use for running asynchronous API call logic (such as retries and long-running operations),
* and also to pass to the transport settings if an executor is needed for the transport and it
* doesn't have its own executor provider.
*
* @deprecated Please use {@link #getBackgroundExecutorProvider()} for getting the executor
* provider that's used for running scheduled API call logic.
*/
@Deprecated
public ExecutorProvider getExecutorProvider() {
return stubSettings.getExecutorProvider();
}

/**
* Gets the ExecutorProvider that was previously set on this Builder. This ExecutorProvider is
* to use for running asynchronous API call logic (such as retries and long-running operations).
* This ExecutorProvider is not used to set the executor in {@link TransportChannelProvider}.
*/
public ExecutorProvider getBackgroundExecutorProvider() {
return stubSettings.getBackgroundExecutorProvider();
}

/** Gets the TransportProvider that was previously set on this Builder. */
public TransportChannelProvider getTransportChannelProvider() {
return stubSettings.getTransportChannelProvider();
Expand Down Expand Up @@ -303,6 +347,7 @@ protected static void applyToAllUnaryMethods(
public String toString() {
return MoreObjects.toStringHelper(this)
.add("executorProvider", getExecutorProvider())
.add("backgroundExecutorProvider", getBackgroundExecutorProvider())
.add("transportChannelProvider", getTransportChannelProvider())
.add("credentialsProvider", getCredentialsProvider())
.add("headerProvider", getHeaderProvider())
Expand Down

0 comments on commit b1f8c43

Please sign in to comment.