diff --git a/gax-grpc/src/main/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProvider.java b/gax-grpc/src/main/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProvider.java index dcfa5c39ee..589edeb24c 100644 --- a/gax-grpc/src/main/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProvider.java +++ b/gax-grpc/src/main/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProvider.java @@ -132,6 +132,7 @@ private InstantiatingGrpcChannelProvider(Builder builder) { : builder.directPathServiceConfig; } + @Deprecated @Override public boolean needsExecutor() { return executor == null; @@ -209,9 +210,7 @@ public TransportChannelProvider withCredentials(Credentials credentials) { @Override public TransportChannel getTransportChannel() throws IOException { - if (needsExecutor()) { - throw new IllegalStateException("getTransportChannel() called when needsExecutor() is true"); - } else if (needsHeaders()) { + if (needsHeaders()) { throw new IllegalStateException("getTransportChannel() called when needsHeaders() is true"); } else if (needsEndpoint()) { throw new IllegalStateException("getTransportChannel() called when needsEndpoint() is true"); diff --git a/gax-grpc/src/test/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProviderTest.java b/gax-grpc/src/test/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProviderTest.java index 72f3cc4646..31df53d4ef 100644 --- a/gax-grpc/src/test/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProviderTest.java +++ b/gax-grpc/src/test/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProviderTest.java @@ -36,6 +36,8 @@ import com.google.api.core.ApiFunction; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder; +import com.google.api.gax.grpc.testing.FakeServiceGrpc; +import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.mtls.AbstractMtlsTransportChannelTest; @@ -44,9 +46,17 @@ import com.google.auth.oauth2.ComputeEngineCredentials; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.SettableFuture; +import com.google.type.Color; +import com.google.type.Money; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.alts.ComputeEngineChannelBuilder; +import io.grpc.stub.ClientCalls; +import io.grpc.stub.StreamObserver; import java.io.IOException; import java.security.GeneralSecurityException; import java.util.ArrayList; @@ -54,9 +64,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.junit.Test; import org.junit.runner.RunWith; @@ -445,6 +457,62 @@ public void testWithDefaultDirectPathServiceConfig() { assertThat(childPolicy.keySet()).containsExactly("pick_first"); } + @Test + public void testDefaultExecutor() throws Exception { + InstantiatingGrpcChannelProvider provider = + InstantiatingGrpcChannelProvider.newBuilder() + .setEndpoint("localhost:1234") + .setHeaderProvider(FixedHeaderProvider.create()) + .build(); + + // The default name thread name for grpc threads configured in GrpcUtil + assertThat(extractExecutorThreadName(provider)).contains("grpc-default-executor"); + } + + /** + * Extract the name of the channel executor thread by instantiating a channel and issuing a fake + * call. + */ + private static String extractExecutorThreadName(InstantiatingGrpcChannelProvider channelProvider) + throws IOException, ExecutionException, InterruptedException { + GrpcTransportChannel transportChannel = + (GrpcTransportChannel) channelProvider.getTransportChannel(); + try { + Channel channel = transportChannel.getChannel(); + + ClientCall call = + channel.newCall(FakeServiceGrpc.METHOD_RECOGNIZE, CallOptions.DEFAULT); + Color request = Color.getDefaultInstance(); + + final SettableFuture threadNameFuture = SettableFuture.create(); + + // Issue a call just to get the thread name of the channel executor + ClientCalls.asyncUnaryCall( + call, + request, + new StreamObserver() { + @Override + public void onNext(Money ignored) { + threadNameFuture.set(Thread.currentThread().getName()); + } + + @Override + public void onError(Throwable ignored) { + threadNameFuture.set(Thread.currentThread().getName()); + } + + @Override + public void onCompleted() { + threadNameFuture.set(Thread.currentThread().getName()); + } + }); + return threadNameFuture.get(); + } finally { + transportChannel.shutdown(); + transportChannel.awaitTermination(10, TimeUnit.SECONDS); + } + } + @Nullable private static Map getAsObject(Map json, String key) { Object mapObject = json.get(key); diff --git a/gax-grpc/src/test/java/com/google/api/gax/grpc/testing/LocalChannelProvider.java b/gax-grpc/src/test/java/com/google/api/gax/grpc/testing/LocalChannelProvider.java index 00ba7a5f1f..5e538a06c2 100644 --- a/gax-grpc/src/test/java/com/google/api/gax/grpc/testing/LocalChannelProvider.java +++ b/gax-grpc/src/test/java/com/google/api/gax/grpc/testing/LocalChannelProvider.java @@ -74,6 +74,7 @@ public boolean shouldAutoClose() { return true; } + @Deprecated @Override public boolean needsExecutor() { return false; diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java index b6da11a39d..0326727662 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java @@ -93,6 +93,7 @@ private InstantiatingHttpJsonChannelProvider( this.mtlsProvider = mtlsProvider; } + @Deprecated @Override public boolean needsExecutor() { return executor == null; @@ -149,9 +150,7 @@ public String getTransportName() { @Override public TransportChannel getTransportChannel() throws IOException { - if (needsExecutor()) { - throw new IllegalStateException("getTransportChannel() called when needsExecutor() is true"); - } else if (needsHeaders()) { + if (needsHeaders()) { throw new IllegalStateException("getTransportChannel() called when needsHeaders() is true"); } else { try { diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java index df8afd391b..47e634a518 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java @@ -39,10 +39,13 @@ import com.google.api.gax.core.BackgroundResource; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -50,6 +53,12 @@ @BetaApi public class ManagedHttpJsonChannel implements HttpJsonChannel, BackgroundResource { private static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance(); + private static final ExecutorService DEFAULT_EXECUTOR = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("http-default-executor-%d") + .build()); private final Executor executor; private final String endpoint; @@ -134,7 +143,9 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted public void close() {} public static Builder newBuilder() { - return new Builder().setHeaderEnhancers(new LinkedList()); + return new Builder() + .setHeaderEnhancers(new LinkedList()) + .setExecutor(DEFAULT_EXECUTOR); } public static class Builder { @@ -147,7 +158,11 @@ public static class Builder { private Builder() {} public Builder setExecutor(Executor executor) { - this.executor = executor; + if (executor != null) { + this.executor = executor; + } else { + this.executor = DEFAULT_EXECUTOR; + } return this; } diff --git a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java index bc00f164be..0bda7546f2 100644 --- a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java +++ b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java @@ -31,21 +31,34 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import com.google.api.core.ApiFuture; +import com.google.api.gax.httpjson.testing.MockHttpService; +import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.mtls.AbstractMtlsTransportChannelTest; import com.google.api.gax.rpc.mtls.MtlsProvider; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.security.GeneralSecurityException; import java.util.Collections; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; @RunWith(JUnit4.class) public class InstantiatingHttpJsonChannelProviderTest extends AbstractMtlsTransportChannelTest { @@ -112,4 +125,85 @@ protected Object getMtlsObjectFromTransportChannel(MtlsProvider provider) .build(); return channelProvider.createHttpTransport(); } + + @Test + public void testDefaultExecutor() throws Exception { + // Create a mock service that will always return errors. We just want to inspect the thread that + // those errors are returned on + MockHttpService mockHttpService = + new MockHttpService(Collections.emptyList(), "/"); + mockHttpService.addException(new RuntimeException("Fake error")); + InstantiatingHttpJsonChannelProvider channelProvider = + InstantiatingHttpJsonChannelProvider.newBuilder() + .setEndpoint("localhost:1234") + .setHeaderProvider(FixedHeaderProvider.create()) + .setHttpTransport(mockHttpService) + .build(); + + assertThat(getThreadName(channelProvider)).contains("http-default-executor"); + } + + @Test + public void testExecutorOverride() throws IOException, ExecutionException, InterruptedException { + MockHttpService mockHttpService = + new MockHttpService(Collections.emptyList(), "/"); + mockHttpService.addException(new RuntimeException("Fake error")); + + final String expectedThreadName = "testExecutorOverrideExecutor"; + + ExecutorService executor = + Executors.newFixedThreadPool( + 1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat(expectedThreadName).build()); + try { + InstantiatingHttpJsonChannelProvider channelProvider = + InstantiatingHttpJsonChannelProvider.newBuilder() + .setExecutor(executor) + .setEndpoint("localhost:1234") + .setHeaderProvider(FixedHeaderProvider.create()) + .setHttpTransport(mockHttpService) + .build(); + + assertThat(getThreadName(channelProvider)).isEqualTo(expectedThreadName); + } finally { + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + } + } + + private static String getThreadName(InstantiatingHttpJsonChannelProvider provider) + throws IOException, InterruptedException, ExecutionException { + @SuppressWarnings("unchecked") + ApiMethodDescriptor apiMethodDescriptor = + mock( + ApiMethodDescriptor.class, + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + throw new UnsupportedOperationException("fake error"); + } + }); + + HttpJsonTransportChannel transportChannel = + (HttpJsonTransportChannel) provider.getTransportChannel(); + final SettableFuture threadNameFuture = SettableFuture.create(); + try { + HttpJsonChannel channel = transportChannel.getChannel(); + ApiFuture rpcFuture = + channel.issueFutureUnaryCall( + HttpJsonCallOptions.newBuilder().build(), new Object(), apiMethodDescriptor); + rpcFuture.addListener( + new Runnable() { + @Override + public void run() { + threadNameFuture.set(Thread.currentThread().getName()); + } + }, + MoreExecutors.directExecutor()); + } finally { + transportChannel.shutdown(); + transportChannel.awaitTermination(10, TimeUnit.SECONDS); + } + return threadNameFuture.get(); + } } diff --git a/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java b/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java index c8a1920c93..cd856a39f3 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java +++ b/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java @@ -50,7 +50,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import javax.annotation.Nonnull; @@ -163,8 +162,13 @@ static String getEndpoint( public static ClientContext create(StubSettings settings) throws IOException { ApiClock clock = settings.getClock(); - ExecutorProvider executorProvider = settings.getExecutorProvider(); - final ScheduledExecutorService executor = executorProvider.getExecutor(); + ExecutorProvider workerExecutorProvider = settings.getWorkerExecutorProvider(); + final ScheduledExecutorService workerExecutor = workerExecutorProvider.getExecutor(); + + final ScheduledExecutorService executor = + settings.getExecutorProvider() == null + ? null + : settings.getExecutorProvider().getExecutor(); Credentials credentials = settings.getCredentialsProvider().getCredentials(); @@ -177,8 +181,11 @@ public static ClientContext create(StubSettings settings) throws IOException { } TransportChannelProvider transportChannelProvider = settings.getTransportChannelProvider(); - if (transportChannelProvider.needsExecutor()) { - transportChannelProvider = transportChannelProvider.withExecutor((Executor) executor); + // After needsExecutor and StubSettings#setExecutor are deprecated, transport channel executor + // can only be set from TransportChannelProvider#withExecutor directly, and all providers will + // have default executors. + if (transportChannelProvider.needsExecutor() && executor != null) { + transportChannelProvider = transportChannelProvider.withExecutor(executor); } Map headers = getHeadersFromSettings(settings); if (transportChannelProvider.needsHeaders()) { @@ -216,7 +223,7 @@ public static ClientContext create(StubSettings settings) throws IOException { watchdogProvider = watchdogProvider.withClock(clock); } if (watchdogProvider.needsExecutor()) { - watchdogProvider = watchdogProvider.withExecutor(executor); + watchdogProvider = watchdogProvider.withExecutor(workerExecutor); } watchdog = watchdogProvider.getWatchdog(); } @@ -226,8 +233,8 @@ public static ClientContext create(StubSettings settings) throws IOException { if (transportChannelProvider.shouldAutoClose()) { backgroundResources.add(transportChannel); } - if (executorProvider.shouldAutoClose()) { - backgroundResources.add(new ExecutorAsBackgroundResource(executor)); + if (workerExecutorProvider.shouldAutoClose()) { + backgroundResources.add(new ExecutorAsBackgroundResource(workerExecutor)); } if (watchdogProvider != null && watchdogProvider.shouldAutoClose()) { backgroundResources.add(watchdog); @@ -235,7 +242,7 @@ public static ClientContext create(StubSettings settings) throws IOException { return newBuilder() .setBackgroundResources(backgroundResources.build()) - .setExecutor(executor) + .setExecutor(workerExecutor) .setCredentials(credentials) .setTransportChannel(transportChannel) .setHeaders(ImmutableMap.copyOf(settings.getHeaderProvider().getHeaders())) diff --git a/gax/src/main/java/com/google/api/gax/rpc/ClientSettings.java b/gax/src/main/java/com/google/api/gax/rpc/ClientSettings.java index 7575fea2b4..d532f75d6c 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/ClientSettings.java +++ b/gax/src/main/java/com/google/api/gax/rpc/ClientSettings.java @@ -64,7 +64,7 @@ public final StubSettings getStubSettings() { } public final ExecutorProvider getExecutorProvider() { - return stubSettings.getExecutorProvider(); + return stubSettings.getWorkerExecutorProvider(); } public final TransportChannelProvider getTransportChannelProvider() { @@ -162,6 +162,7 @@ protected StubSettings.Builder getStubSettings() { */ public B setExecutorProvider(ExecutorProvider executorProvider) { stubSettings.setExecutorProvider(executorProvider); + stubSettings.setWorkerExecutorProvider(executorProvider); return self(); } @@ -240,7 +241,7 @@ public B setWatchdogCheckInterval(@Nullable Duration checkInterval) { /** Gets the ExecutorProvider that was previously set on this Builder. */ public ExecutorProvider getExecutorProvider() { - return stubSettings.getExecutorProvider(); + return stubSettings.getWorkerExecutorProvider(); } /** Gets the TransportProvider that was previously set on this Builder. */ diff --git a/gax/src/main/java/com/google/api/gax/rpc/StubSettings.java b/gax/src/main/java/com/google/api/gax/rpc/StubSettings.java index 25fc85512e..c95af81262 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/StubSettings.java +++ b/gax/src/main/java/com/google/api/gax/rpc/StubSettings.java @@ -45,6 +45,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import java.io.IOException; +import java.util.concurrent.Executor; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.threeten.bp.Duration; @@ -64,6 +65,7 @@ public abstract class StubSettings> { static final String QUOTA_PROJECT_ID_HEADER_KEY = "x-goog-user-project"; private final ExecutorProvider executorProvider; + private final ExecutorProvider workerExecutorProvider; private final CredentialsProvider credentialsProvider; private final HeaderProvider headerProvider; private final HeaderProvider internalHeaderProvider; @@ -87,6 +89,7 @@ public abstract class StubSettings> { /** Constructs an instance of StubSettings. */ protected StubSettings(Builder builder) { this.executorProvider = builder.executorProvider; + this.workerExecutorProvider = builder.workerExecutorProvider; this.transportChannelProvider = builder.transportChannelProvider; this.credentialsProvider = builder.credentialsProvider; this.headerProvider = builder.headerProvider; @@ -101,10 +104,16 @@ protected StubSettings(Builder builder) { this.tracerFactory = builder.tracerFactory; } + /** @deprecated Please use {@link #getWorkerExecutorProvider()}. */ + @Deprecated public final ExecutorProvider getExecutorProvider() { return executorProvider; } + public final ExecutorProvider getWorkerExecutorProvider() { + return workerExecutorProvider; + } + public final TransportChannelProvider getTransportChannelProvider() { return transportChannelProvider; } @@ -169,6 +178,7 @@ public ApiTracerFactory getTracerFactory() { public String toString() { return MoreObjects.toStringHelper(this) .add("executorProvider", executorProvider) + .add("workerExecutorProvider", workerExecutorProvider) .add("transportChannelProvider", transportChannelProvider) .add("credentialsProvider", credentialsProvider) .add("headerProvider", headerProvider) @@ -190,6 +200,7 @@ public abstract static class Builder< SettingsT extends StubSettings, B extends Builder> { private ExecutorProvider executorProvider; + private ExecutorProvider workerExecutorProvider; private CredentialsProvider credentialsProvider; private HeaderProvider headerProvider; private HeaderProvider internalHeaderProvider; @@ -213,6 +224,7 @@ public abstract static class Builder< /** Create a builder from a StubSettings object. */ protected Builder(StubSettings settings) { this.executorProvider = settings.executorProvider; + this.workerExecutorProvider = settings.workerExecutorProvider; this.transportChannelProvider = settings.transportChannelProvider; this.credentialsProvider = settings.credentialsProvider; this.headerProvider = settings.headerProvider; @@ -246,7 +258,8 @@ private static String getQuotaProjectIdFromClientContext(ClientContext clientCon protected Builder(ClientContext clientContext) { if (clientContext == null) { - this.executorProvider = InstantiatingExecutorProvider.newBuilder().build(); + this.executorProvider = null; + this.workerExecutorProvider = InstantiatingExecutorProvider.newBuilder().build(); this.transportChannelProvider = null; this.credentialsProvider = NoCredentialsProvider.create(); this.headerProvider = new NoHeaderProvider(); @@ -260,6 +273,7 @@ protected Builder(ClientContext clientContext) { this.tracerFactory = NoopApiTracerFactory.getInstance(); } else { this.executorProvider = FixedExecutorProvider.create(clientContext.getExecutor()); + this.workerExecutorProvider = FixedExecutorProvider.create(clientContext.getExecutor()); this.transportChannelProvider = FixedTransportChannelProvider.create(clientContext.getTransportChannel()); this.credentialsProvider = FixedCredentialsProvider.create(clientContext.getCredentials()); @@ -289,13 +303,29 @@ protected B self() { } /** - * Sets the ExecutorProvider to use for getting the executor to use for running asynchronous API - * call logic (such as retries and long-running operations), and also to pass to the transport - * settings if an executor is needed for the transport and it doesn't have its own executor - * provider. + * @deprecated Please use {@link #setWorkerExecutorProvider(ExecutorProvider)} for setting + * executor to use for running asynchronous API call logic. To set executor for {@link + * TransportChannelProvider}, please use {@link + * TransportChannelProvider#withExecutor(Executor)} instead. */ + @Deprecated public B setExecutorProvider(ExecutorProvider executorProvider) { + // For backward compatibility, this will set both executorProvider and workerExecutorProvider. + // ExecutorProvider is null by default. In ClientContext#create(), if TransportChannelProvider + // doesn't have an executor, executorProvider will be used as TransportChannelProvider's + // executor. After this method is deprecated, TransportChannelProvider's executor can only be + // set with TransportChannelProvider#withExecutor. this.executorProvider = executorProvider; + this.workerExecutorProvider = executorProvider; + return self(); + } + + /** + * Sets the executor to use for running asynchronous API call logic (such as retries and + * long-running operations). + */ + public B setWorkerExecutorProvider(ExecutorProvider workerExecutorProvider) { + this.workerExecutorProvider = workerExecutorProvider; return self(); } @@ -416,11 +446,17 @@ public B setTracerFactory(@Nonnull ApiTracerFactory tracerFactory) { return self(); } - /** Gets the ExecutorProvider that was previously set on this Builder. */ + /** @deprecated Please use {@link #getWorkerExecutorProvider()}. */ + @Deprecated public ExecutorProvider getExecutorProvider() { return executorProvider; } + /** Gets the ExecutorProvider that was previously set on this Builder. */ + public ExecutorProvider getWorkerExecutorProvider() { + return workerExecutorProvider; + } + /** Gets the TransportProvider that was previously set on this Builder. */ public TransportChannelProvider getTransportChannelProvider() { return transportChannelProvider; @@ -494,6 +530,7 @@ protected static void applyToAllUnaryMethods( public String toString() { return MoreObjects.toStringHelper(this) .add("executorProvider", executorProvider) + .add("workerExecutorProvider", workerExecutorProvider) .add("transportChannelProvider", transportChannelProvider) .add("credentialsProvider", credentialsProvider) .add("headerProvider", headerProvider) diff --git a/gax/src/main/java/com/google/api/gax/rpc/TransportChannelProvider.java b/gax/src/main/java/com/google/api/gax/rpc/TransportChannelProvider.java index 3cfc9349e3..d754c371ea 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/TransportChannelProvider.java +++ b/gax/src/main/java/com/google/api/gax/rpc/TransportChannelProvider.java @@ -64,6 +64,7 @@ public interface TransportChannelProvider { boolean shouldAutoClose(); /** True if the TransportProvider needs an executor. */ + @Deprecated boolean needsExecutor(); /** diff --git a/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java b/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java index 9bacd72c0a..7fe418dd68 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java @@ -176,12 +176,10 @@ public TransportChannelProvider withPoolSize(int size) { @Override public TransportChannel getTransportChannel() throws IOException { - if (needsExecutor()) { - throw new IllegalStateException("Needs Executor"); - } if (needsCredentials()) { throw new IllegalStateException("Needs Credentials"); } + transport.setExecutor(executor); return transport; } @@ -719,4 +717,56 @@ public void testSwitchToMtlsEndpointAllowed() throws IOException { assertFalse(settings.getSwitchToMtlsEndpointAllowed()); assertEquals(endpoint, settings.getEndpoint()); } + + @Test + public void testExecutorSettings() throws Exception { + TransportChannelProvider transportChannelProvider = + new FakeTransportProvider( + FakeTransportChannel.create(new FakeChannel()), null, true, null, null); + + ClientSettings.Builder builder = + new FakeClientSettings.Builder() + .setTransportChannelProvider(transportChannelProvider) + .setCredentialsProvider( + FixedCredentialsProvider.create(Mockito.mock(GoogleCredentials.class))); + + // By default, if executor is not set, channel provider should not have an executor set + ClientContext context = ClientContext.create(builder.build()); + FakeTransportChannel transportChannel = (FakeTransportChannel) context.getTransportChannel(); + assertThat(transportChannel.getExecutor()).isNull(); + + ExecutorProvider channelExecutorProvider = + FixedExecutorProvider.create(Mockito.mock(ScheduledExecutorService.class)); + builder.setTransportChannelProvider( + transportChannelProvider.withExecutor((Executor) channelExecutorProvider.getExecutor())); + context = ClientContext.create(builder.build()); + transportChannel = (FakeTransportChannel) context.getTransportChannel(); + assertThat(transportChannel.getExecutor()) + .isSameInstanceAs(channelExecutorProvider.getExecutor()); + + ExecutorProvider executorProvider = + FixedExecutorProvider.create(Mockito.mock(ScheduledExecutorService.class)); + assertThat(channelExecutorProvider.getExecutor()) + .isNotSameInstanceAs(executorProvider.getExecutor()); + + // For backward compatibility, if executor is set from stubSettings.setExecutor, and transport + // channel already has an executor, the ExecutorProvider set in stubSettings won't override + // transport channel's executor + builder.setExecutorProvider(executorProvider); + context = ClientContext.create(builder.build()); + transportChannel = (FakeTransportChannel) context.getTransportChannel(); + assertThat(transportChannel.getExecutor()) + .isSameInstanceAs(channelExecutorProvider.getExecutor()); + + // For backward compatibility, if executor is set from stubSettings.setExecutor, and transport + // channel doesn't have an executor, transport channel will get the executor from + // stubSettings.setExecutor + builder.setExecutorProvider(executorProvider); + builder.setTransportChannelProvider( + new FakeTransportProvider( + FakeTransportChannel.create(new FakeChannel()), null, true, null, null)); + context = ClientContext.create(builder.build()); + transportChannel = (FakeTransportChannel) context.getTransportChannel(); + assertThat(transportChannel.getExecutor()).isSameInstanceAs(executorProvider.getExecutor()); + } } diff --git a/gax/src/test/java/com/google/api/gax/rpc/ClientSettingsTest.java b/gax/src/test/java/com/google/api/gax/rpc/ClientSettingsTest.java index d777f506a7..87a73b1310 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/ClientSettingsTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/ClientSettingsTest.java @@ -111,6 +111,8 @@ public void testEmptyBuilder() throws Exception { FakeClientSettings.Builder builder = new FakeClientSettings.Builder(); Truth.assertThat(builder.getExecutorProvider()) .isInstanceOf(InstantiatingExecutorProvider.class); + Truth.assertThat(builder.getStubSettings().getWorkerExecutorProvider()) + .isSameInstanceAs(builder.getExecutorProvider()); Truth.assertThat(builder.getTransportChannelProvider()).isNull(); Truth.assertThat(builder.getCredentialsProvider()).isInstanceOf(NoCredentialsProvider.class); Truth.assertThat(builder.getClock()).isInstanceOf(NanoClock.class); @@ -124,6 +126,8 @@ public void testEmptyBuilder() throws Exception { FakeClientSettings settings = builder.build(); Truth.assertThat(settings.getExecutorProvider()) .isSameInstanceAs(builder.getExecutorProvider()); + Truth.assertThat(settings.getStubSettings().getWorkerExecutorProvider()) + .isSameInstanceAs(settings.getExecutorProvider()); Truth.assertThat(settings.getTransportChannelProvider()) .isSameInstanceAs(builder.getTransportChannelProvider()); Truth.assertThat(settings.getCredentialsProvider()) @@ -173,6 +177,8 @@ public void testBuilder() throws Exception { builder.setQuotaProjectId(quotaProjectId); Truth.assertThat(builder.getExecutorProvider()).isSameInstanceAs(executorProvider); + Truth.assertThat(builder.getStubSettings().getWorkerExecutorProvider()) + .isSameInstanceAs(executorProvider); Truth.assertThat(builder.getTransportChannelProvider()).isSameInstanceAs(transportProvider); Truth.assertThat(builder.getCredentialsProvider()).isSameInstanceAs(credentialsProvider); Truth.assertThat(builder.getClock()).isSameInstanceAs(clock); @@ -223,6 +229,8 @@ public void testBuilderFromClientContext() throws Exception { FakeClientSettings.Builder builder = new FakeClientSettings.Builder(clientContext); Truth.assertThat(builder.getExecutorProvider()).isInstanceOf(FixedExecutorProvider.class); + Truth.assertThat(builder.getStubSettings().getWorkerExecutorProvider()) + .isSameInstanceAs(builder.getExecutorProvider()); Truth.assertThat(builder.getTransportChannelProvider()) .isInstanceOf(FixedTransportChannelProvider.class); Truth.assertThat(builder.getCredentialsProvider()).isInstanceOf(FixedCredentialsProvider.class); @@ -263,6 +271,8 @@ public void testBuilderFromSettings() throws Exception { FakeClientSettings.Builder newBuilder = new FakeClientSettings.Builder(settings); Truth.assertThat(newBuilder.getExecutorProvider()).isSameInstanceAs(executorProvider); + Truth.assertThat(newBuilder.getStubSettings().getWorkerExecutorProvider()) + .isSameInstanceAs(executorProvider); Truth.assertThat(newBuilder.getTransportChannelProvider()).isSameInstanceAs(transportProvider); Truth.assertThat(newBuilder.getCredentialsProvider()).isSameInstanceAs(credentialsProvider); Truth.assertThat(newBuilder.getClock()).isSameInstanceAs(clock); diff --git a/gax/src/test/java/com/google/api/gax/rpc/testing/FakeTransportChannel.java b/gax/src/test/java/com/google/api/gax/rpc/testing/FakeTransportChannel.java index b6c56a7012..0d4abac8f1 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/testing/FakeTransportChannel.java +++ b/gax/src/test/java/com/google/api/gax/rpc/testing/FakeTransportChannel.java @@ -32,6 +32,7 @@ import com.google.api.core.InternalApi; import com.google.api.gax.rpc.TransportChannel; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @InternalApi("for testing") @@ -39,6 +40,7 @@ public class FakeTransportChannel implements TransportChannel { private final FakeChannel channel; private volatile boolean isShutdown = false; private volatile Map headers; + private volatile Executor executor; private FakeTransportChannel(FakeChannel channel) { this.channel = channel; @@ -102,4 +104,12 @@ public void setHeaders(Map headers) { public Map getHeaders() { return this.headers; } + + public void setExecutor(Executor executor) { + this.executor = executor; + } + + public Executor getExecutor() { + return executor; + } }