diff --git a/gax-grpc/src/main/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProvider.java b/gax-grpc/src/main/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProvider.java index 9797b4d7d6..5346ceb989 100644 --- a/gax-grpc/src/main/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProvider.java +++ b/gax-grpc/src/main/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProvider.java @@ -123,6 +123,7 @@ private InstantiatingGrpcChannelProvider(Builder builder) { : builder.directPathServiceConfig; } + @Deprecated @Override public boolean needsExecutor() { return executor == null; @@ -200,9 +201,7 @@ public TransportChannelProvider withCredentials(Credentials credentials) { @Override public TransportChannel getTransportChannel() throws IOException { - if (needsExecutor()) { - throw new IllegalStateException("getTransportChannel() called when needsExecutor() is true"); - } else if (needsHeaders()) { + if (needsHeaders()) { throw new IllegalStateException("getTransportChannel() called when needsHeaders() is true"); } else if (needsEndpoint()) { throw new IllegalStateException("getTransportChannel() called when needsEndpoint() is true"); diff --git a/gax-grpc/src/test/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProviderTest.java b/gax-grpc/src/test/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProviderTest.java index 47f612db5d..0f09defd38 100644 --- a/gax-grpc/src/test/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProviderTest.java +++ b/gax-grpc/src/test/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProviderTest.java @@ -36,24 +36,36 @@ import com.google.api.core.ApiFunction; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder; +import com.google.api.gax.grpc.testing.FakeServiceGrpc; +import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auth.oauth2.CloudShellCredentials; import com.google.auth.oauth2.ComputeEngineCredentials; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.SettableFuture; +import com.google.type.Color; +import com.google.type.Money; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.alts.ComputeEngineChannelBuilder; +import io.grpc.stub.ClientCalls; +import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.junit.Test; import org.junit.runner.RunWith; @@ -443,6 +455,62 @@ public void testWithDefaultDirectPathServiceConfig() { assertThat(childPolicy.keySet()).containsExactly("pick_first"); } + @Test + public void testDefaultExecutor() throws Exception { + InstantiatingGrpcChannelProvider provider = + InstantiatingGrpcChannelProvider.newBuilder() + .setEndpoint("localhost:1234") + .setHeaderProvider(FixedHeaderProvider.create()) + .build(); + + // The default name thread name for grpc threads configured in GrpcUtil + assertThat(extractExecutorThreadName(provider)).contains("grpc-default-executor"); + } + + /** + * Extract the name of the channel executor thread by instantiating a channel and issuing a fake + * call. + */ + private static String extractExecutorThreadName(InstantiatingGrpcChannelProvider channelProvider) + throws IOException, ExecutionException, InterruptedException { + GrpcTransportChannel transportChannel = + (GrpcTransportChannel) channelProvider.getTransportChannel(); + try { + Channel channel = transportChannel.getChannel(); + + ClientCall call = + channel.newCall(FakeServiceGrpc.METHOD_RECOGNIZE, CallOptions.DEFAULT); + Color request = Color.getDefaultInstance(); + + final SettableFuture threadNameFuture = SettableFuture.create(); + + // Issue a call just to get the thread name of the channel executor + ClientCalls.asyncUnaryCall( + call, + request, + new StreamObserver() { + @Override + public void onNext(Money ignored) { + threadNameFuture.set(Thread.currentThread().getName()); + } + + @Override + public void onError(Throwable ignored) { + threadNameFuture.set(Thread.currentThread().getName()); + } + + @Override + public void onCompleted() { + threadNameFuture.set(Thread.currentThread().getName()); + } + }); + return threadNameFuture.get(); + } finally { + transportChannel.shutdown(); + transportChannel.awaitTermination(10, TimeUnit.SECONDS); + } + } + @Nullable private static Map getAsObject(Map json, String key) { Object mapObject = json.get(key); diff --git a/gax-grpc/src/test/java/com/google/api/gax/grpc/testing/LocalChannelProvider.java b/gax-grpc/src/test/java/com/google/api/gax/grpc/testing/LocalChannelProvider.java index 00ba7a5f1f..5e538a06c2 100644 --- a/gax-grpc/src/test/java/com/google/api/gax/grpc/testing/LocalChannelProvider.java +++ b/gax-grpc/src/test/java/com/google/api/gax/grpc/testing/LocalChannelProvider.java @@ -74,6 +74,7 @@ public boolean shouldAutoClose() { return true; } + @Deprecated @Override public boolean needsExecutor() { return false; diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java index 198ce4d7d3..427331839b 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java @@ -84,6 +84,7 @@ private InstantiatingHttpJsonChannelProvider( this.httpTransport = httpTransport; } + @Deprecated @Override public boolean needsExecutor() { return executor == null; @@ -140,9 +141,7 @@ public String getTransportName() { @Override public TransportChannel getTransportChannel() throws IOException { - if (needsExecutor()) { - throw new IllegalStateException("getTransportChannel() called when needsExecutor() is true"); - } else if (needsHeaders()) { + if (needsHeaders()) { throw new IllegalStateException("getTransportChannel() called when needsHeaders() is true"); } else { return createChannel(); diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java index df8afd391b..47e634a518 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java @@ -39,10 +39,13 @@ import com.google.api.gax.core.BackgroundResource; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -50,6 +53,12 @@ @BetaApi public class ManagedHttpJsonChannel implements HttpJsonChannel, BackgroundResource { private static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance(); + private static final ExecutorService DEFAULT_EXECUTOR = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("http-default-executor-%d") + .build()); private final Executor executor; private final String endpoint; @@ -134,7 +143,9 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted public void close() {} public static Builder newBuilder() { - return new Builder().setHeaderEnhancers(new LinkedList()); + return new Builder() + .setHeaderEnhancers(new LinkedList()) + .setExecutor(DEFAULT_EXECUTOR); } public static class Builder { @@ -147,7 +158,11 @@ public static class Builder { private Builder() {} public Builder setExecutor(Executor executor) { - this.executor = executor; + if (executor != null) { + this.executor = executor; + } else { + this.executor = DEFAULT_EXECUTOR; + } return this; } diff --git a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java index baff7bee57..30c80a39d2 100644 --- a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java +++ b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java @@ -31,16 +31,29 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import com.google.api.core.ApiFuture; +import com.google.api.gax.httpjson.testing.MockHttpService; +import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.util.Collections; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; @RunWith(JUnit4.class) public class InstantiatingHttpJsonChannelProviderTest { @@ -94,4 +107,85 @@ public void basicTest() throws IOException { // Make sure we can create channels OK. provider.getTransportChannel().shutdownNow(); } + + @Test + public void testDefaultExecutor() throws Exception { + // Create a mock service that will always return errors. We just want to inspect the thread that + // those errors are returned on + MockHttpService mockHttpService = + new MockHttpService(Collections.emptyList(), "/"); + mockHttpService.addException(new RuntimeException("Fake error")); + InstantiatingHttpJsonChannelProvider channelProvider = + InstantiatingHttpJsonChannelProvider.newBuilder() + .setEndpoint("localhost:1234") + .setHeaderProvider(FixedHeaderProvider.create()) + .setHttpTransport(mockHttpService) + .build(); + + assertThat(getThreadName(channelProvider)).contains("http-default-executor"); + } + + @Test + public void testExecutorOverride() throws IOException, ExecutionException, InterruptedException { + MockHttpService mockHttpService = + new MockHttpService(Collections.emptyList(), "/"); + mockHttpService.addException(new RuntimeException("Fake error")); + + final String expectedThreadName = "testExecutorOverrideExecutor"; + + ExecutorService executor = + Executors.newFixedThreadPool( + 1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat(expectedThreadName).build()); + try { + InstantiatingHttpJsonChannelProvider channelProvider = + InstantiatingHttpJsonChannelProvider.newBuilder() + .setExecutor(executor) + .setEndpoint("localhost:1234") + .setHeaderProvider(FixedHeaderProvider.create()) + .setHttpTransport(mockHttpService) + .build(); + + assertThat(getThreadName(channelProvider)).isEqualTo(expectedThreadName); + } finally { + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + } + } + + private static String getThreadName(InstantiatingHttpJsonChannelProvider provider) + throws IOException, InterruptedException, ExecutionException { + @SuppressWarnings("unchecked") + ApiMethodDescriptor apiMethodDescriptor = + mock( + ApiMethodDescriptor.class, + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + throw new UnsupportedOperationException("fake error"); + } + }); + + HttpJsonTransportChannel transportChannel = + (HttpJsonTransportChannel) provider.getTransportChannel(); + final SettableFuture threadNameFuture = SettableFuture.create(); + try { + HttpJsonChannel channel = transportChannel.getChannel(); + ApiFuture rpcFuture = + channel.issueFutureUnaryCall( + HttpJsonCallOptions.newBuilder().build(), new Object(), apiMethodDescriptor); + rpcFuture.addListener( + new Runnable() { + @Override + public void run() { + threadNameFuture.set(Thread.currentThread().getName()); + } + }, + MoreExecutors.directExecutor()); + } finally { + transportChannel.shutdown(); + transportChannel.awaitTermination(10, TimeUnit.SECONDS); + } + return threadNameFuture.get(); + } } diff --git a/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java b/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java index ac2335235b..f986f16b98 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java +++ b/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java @@ -49,7 +49,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import javax.annotation.Nonnull; @@ -139,8 +138,13 @@ public static ClientContext create(ClientSettings settings) throws IOException { public static ClientContext create(StubSettings settings) throws IOException { ApiClock clock = settings.getClock(); - ExecutorProvider executorProvider = settings.getExecutorProvider(); - final ScheduledExecutorService executor = executorProvider.getExecutor(); + ExecutorProvider workerExecutorProvider = settings.getWorkerExecutorProvider(); + final ScheduledExecutorService workerExecutor = workerExecutorProvider.getExecutor(); + + final ScheduledExecutorService executor = + settings.getExecutorProvider() == null + ? null + : settings.getExecutorProvider().getExecutor(); Credentials credentials = settings.getCredentialsProvider().getCredentials(); @@ -153,8 +157,11 @@ public static ClientContext create(StubSettings settings) throws IOException { } TransportChannelProvider transportChannelProvider = settings.getTransportChannelProvider(); - if (transportChannelProvider.needsExecutor()) { - transportChannelProvider = transportChannelProvider.withExecutor((Executor) executor); + // After needsExecutor and StubSettings#setExecutor are deprecated, transport channel executor + // can only be set from TransportChannelProvider#withExecutor directly, and all providers will + // have default executors. + if (transportChannelProvider.needsExecutor() && executor != null) { + transportChannelProvider = transportChannelProvider.withExecutor(executor); } Map headers = getHeadersFromSettings(settings); if (transportChannelProvider.needsHeaders()) { @@ -186,7 +193,7 @@ public static ClientContext create(StubSettings settings) throws IOException { watchdogProvider = watchdogProvider.withClock(clock); } if (watchdogProvider.needsExecutor()) { - watchdogProvider = watchdogProvider.withExecutor(executor); + watchdogProvider = watchdogProvider.withExecutor(workerExecutor); } watchdog = watchdogProvider.getWatchdog(); } @@ -196,8 +203,8 @@ public static ClientContext create(StubSettings settings) throws IOException { if (transportChannelProvider.shouldAutoClose()) { backgroundResources.add(transportChannel); } - if (executorProvider.shouldAutoClose()) { - backgroundResources.add(new ExecutorAsBackgroundResource(executor)); + if (workerExecutorProvider.shouldAutoClose()) { + backgroundResources.add(new ExecutorAsBackgroundResource(workerExecutor)); } if (watchdogProvider != null && watchdogProvider.shouldAutoClose()) { backgroundResources.add(watchdog); @@ -205,7 +212,7 @@ public static ClientContext create(StubSettings settings) throws IOException { return newBuilder() .setBackgroundResources(backgroundResources.build()) - .setExecutor(executor) + .setExecutor(workerExecutor) .setCredentials(credentials) .setTransportChannel(transportChannel) .setHeaders(ImmutableMap.copyOf(settings.getHeaderProvider().getHeaders())) diff --git a/gax/src/main/java/com/google/api/gax/rpc/ClientSettings.java b/gax/src/main/java/com/google/api/gax/rpc/ClientSettings.java index 7575fea2b4..d532f75d6c 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/ClientSettings.java +++ b/gax/src/main/java/com/google/api/gax/rpc/ClientSettings.java @@ -64,7 +64,7 @@ public final StubSettings getStubSettings() { } public final ExecutorProvider getExecutorProvider() { - return stubSettings.getExecutorProvider(); + return stubSettings.getWorkerExecutorProvider(); } public final TransportChannelProvider getTransportChannelProvider() { @@ -162,6 +162,7 @@ protected StubSettings.Builder getStubSettings() { */ public B setExecutorProvider(ExecutorProvider executorProvider) { stubSettings.setExecutorProvider(executorProvider); + stubSettings.setWorkerExecutorProvider(executorProvider); return self(); } @@ -240,7 +241,7 @@ public B setWatchdogCheckInterval(@Nullable Duration checkInterval) { /** Gets the ExecutorProvider that was previously set on this Builder. */ public ExecutorProvider getExecutorProvider() { - return stubSettings.getExecutorProvider(); + return stubSettings.getWorkerExecutorProvider(); } /** Gets the TransportProvider that was previously set on this Builder. */ diff --git a/gax/src/main/java/com/google/api/gax/rpc/StubSettings.java b/gax/src/main/java/com/google/api/gax/rpc/StubSettings.java index 1b6090107d..c2076c4f99 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/StubSettings.java +++ b/gax/src/main/java/com/google/api/gax/rpc/StubSettings.java @@ -45,6 +45,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import java.io.IOException; +import java.util.concurrent.Executor; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.threeten.bp.Duration; @@ -64,6 +65,7 @@ public abstract class StubSettings> { static final String QUOTA_PROJECT_ID_HEADER_KEY = "x-goog-user-project"; private final ExecutorProvider executorProvider; + private final ExecutorProvider workerExecutorProvider; private final CredentialsProvider credentialsProvider; private final HeaderProvider headerProvider; private final HeaderProvider internalHeaderProvider; @@ -78,6 +80,7 @@ public abstract class StubSettings> { /** Constructs an instance of StubSettings. */ protected StubSettings(Builder builder) { this.executorProvider = builder.executorProvider; + this.workerExecutorProvider = builder.workerExecutorProvider; this.transportChannelProvider = builder.transportChannelProvider; this.credentialsProvider = builder.credentialsProvider; this.headerProvider = builder.headerProvider; @@ -90,10 +93,16 @@ protected StubSettings(Builder builder) { this.tracerFactory = builder.tracerFactory; } + /** @deprecated Please use {@link #getWorkerExecutorProvider()}. */ + @Deprecated public final ExecutorProvider getExecutorProvider() { return executorProvider; } + public final ExecutorProvider getWorkerExecutorProvider() { + return workerExecutorProvider; + } + public final TransportChannelProvider getTransportChannelProvider() { return transportChannelProvider; } @@ -149,6 +158,7 @@ public ApiTracerFactory getTracerFactory() { public String toString() { return MoreObjects.toStringHelper(this) .add("executorProvider", executorProvider) + .add("workerExecutorProvider", workerExecutorProvider) .add("transportChannelProvider", transportChannelProvider) .add("credentialsProvider", credentialsProvider) .add("headerProvider", headerProvider) @@ -168,6 +178,7 @@ public abstract static class Builder< SettingsT extends StubSettings, B extends Builder> { private ExecutorProvider executorProvider; + private ExecutorProvider workerExecutorProvider; private CredentialsProvider credentialsProvider; private HeaderProvider headerProvider; private HeaderProvider internalHeaderProvider; @@ -182,6 +193,7 @@ public abstract static class Builder< /** Create a builder from a StubSettings object. */ protected Builder(StubSettings settings) { this.executorProvider = settings.executorProvider; + this.workerExecutorProvider = settings.workerExecutorProvider; this.transportChannelProvider = settings.transportChannelProvider; this.credentialsProvider = settings.credentialsProvider; this.headerProvider = settings.headerProvider; @@ -213,7 +225,8 @@ private static String getQuotaProjectIdFromClientContext(ClientContext clientCon protected Builder(ClientContext clientContext) { if (clientContext == null) { - this.executorProvider = InstantiatingExecutorProvider.newBuilder().build(); + this.executorProvider = null; + this.workerExecutorProvider = InstantiatingExecutorProvider.newBuilder().build(); this.transportChannelProvider = null; this.credentialsProvider = NoCredentialsProvider.create(); this.headerProvider = new NoHeaderProvider(); @@ -226,6 +239,7 @@ protected Builder(ClientContext clientContext) { this.tracerFactory = NoopApiTracerFactory.getInstance(); } else { this.executorProvider = FixedExecutorProvider.create(clientContext.getExecutor()); + this.workerExecutorProvider = FixedExecutorProvider.create(clientContext.getExecutor()); this.transportChannelProvider = FixedTransportChannelProvider.create(clientContext.getTransportChannel()); this.credentialsProvider = FixedCredentialsProvider.create(clientContext.getCredentials()); @@ -252,13 +266,29 @@ protected B self() { } /** - * Sets the ExecutorProvider to use for getting the executor to use for running asynchronous API - * call logic (such as retries and long-running operations), and also to pass to the transport - * settings if an executor is needed for the transport and it doesn't have its own executor - * provider. + * @deprecated Please use {@link #setWorkerExecutorProvider(ExecutorProvider)} for setting + * executor to use for running asynchronous API call logic. To set executor for {@link + * TransportChannelProvider}, please use {@link + * TransportChannelProvider#withExecutor(Executor)} instead. */ + @Deprecated public B setExecutorProvider(ExecutorProvider executorProvider) { + // For backward compatibility, this will set both executorProvider and workerExecutorProvider. + // ExecutorProvider is null by default. In ClientContext#create(), if TransportChannelProvider + // doesn't have an executor, executorProvider will be used as TransportChannelProvider's + // executor. After this method is deprecated, TransportChannelProvider's executor can only be + // set with TransportChannelProvider#withExecutor. this.executorProvider = executorProvider; + this.workerExecutorProvider = executorProvider; + return self(); + } + + /** + * Sets the executor to use for running asynchronous API call logic (such as retries and + * long-running operations). + */ + public B setWorkerExecutorProvider(ExecutorProvider workerExecutorProvider) { + this.workerExecutorProvider = workerExecutorProvider; return self(); } @@ -365,11 +395,17 @@ public B setTracerFactory(@Nonnull ApiTracerFactory tracerFactory) { return self(); } - /** Gets the ExecutorProvider that was previously set on this Builder. */ + /** @deprecated Please use {@link #getWorkerExecutorProvider()}. */ + @Deprecated public ExecutorProvider getExecutorProvider() { return executorProvider; } + /** Gets the ExecutorProvider that was previously set on this Builder. */ + public ExecutorProvider getWorkerExecutorProvider() { + return workerExecutorProvider; + } + /** Gets the TransportProvider that was previously set on this Builder. */ public TransportChannelProvider getTransportChannelProvider() { return transportChannelProvider; @@ -439,6 +475,7 @@ protected static void applyToAllUnaryMethods( public String toString() { return MoreObjects.toStringHelper(this) .add("executorProvider", executorProvider) + .add("workerExecutorProvider", workerExecutorProvider) .add("transportChannelProvider", transportChannelProvider) .add("credentialsProvider", credentialsProvider) .add("headerProvider", headerProvider) diff --git a/gax/src/main/java/com/google/api/gax/rpc/TransportChannelProvider.java b/gax/src/main/java/com/google/api/gax/rpc/TransportChannelProvider.java index 3cfc9349e3..d754c371ea 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/TransportChannelProvider.java +++ b/gax/src/main/java/com/google/api/gax/rpc/TransportChannelProvider.java @@ -64,6 +64,7 @@ public interface TransportChannelProvider { boolean shouldAutoClose(); /** True if the TransportProvider needs an executor. */ + @Deprecated boolean needsExecutor(); /** diff --git a/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java b/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java index 174b351648..f334cb6d68 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java @@ -168,12 +168,10 @@ public TransportChannelProvider withPoolSize(int size) { @Override public TransportChannel getTransportChannel() throws IOException { - if (needsExecutor()) { - throw new IllegalStateException("Needs Executor"); - } if (needsCredentials()) { throw new IllegalStateException("Needs Credentials"); } + transport.setExecutor(executor); return transport; } @@ -597,4 +595,56 @@ public void testUserAgentConcat() throws Exception { assertThat(transportChannel.getHeaders()) .containsEntry("user-agent", "user-supplied-agent internal-agent"); } + + @Test + public void testExecutorSettings() throws Exception { + TransportChannelProvider transportChannelProvider = + new FakeTransportProvider( + FakeTransportChannel.create(new FakeChannel()), null, true, null, null); + + ClientSettings.Builder builder = + new FakeClientSettings.Builder() + .setTransportChannelProvider(transportChannelProvider) + .setCredentialsProvider( + FixedCredentialsProvider.create(Mockito.mock(GoogleCredentials.class))); + + // By default, if executor is not set, channel provider should not have an executor set + ClientContext context = ClientContext.create(builder.build()); + FakeTransportChannel transportChannel = (FakeTransportChannel) context.getTransportChannel(); + assertThat(transportChannel.getExecutor()).isNull(); + + ExecutorProvider channelExecutorProvider = + FixedExecutorProvider.create(Mockito.mock(ScheduledExecutorService.class)); + builder.setTransportChannelProvider( + transportChannelProvider.withExecutor((Executor) channelExecutorProvider.getExecutor())); + context = ClientContext.create(builder.build()); + transportChannel = (FakeTransportChannel) context.getTransportChannel(); + assertThat(transportChannel.getExecutor()) + .isSameInstanceAs(channelExecutorProvider.getExecutor()); + + ExecutorProvider executorProvider = + FixedExecutorProvider.create(Mockito.mock(ScheduledExecutorService.class)); + assertThat(channelExecutorProvider.getExecutor()) + .isNotSameInstanceAs(executorProvider.getExecutor()); + + // For backward compatibility, if executor is set from stubSettings.setExecutor, and transport + // channel already has an executor, the ExecutorProvider set in stubSettings won't override + // transport channel's executor + builder.setExecutorProvider(executorProvider); + context = ClientContext.create(builder.build()); + transportChannel = (FakeTransportChannel) context.getTransportChannel(); + assertThat(transportChannel.getExecutor()) + .isSameInstanceAs(channelExecutorProvider.getExecutor()); + + // For backward compatibility, if executor is set from stubSettings.setExecutor, and transport + // channel doesn't have an executor, transport channel will get the executor from + // stubSettings.setExecutor + builder.setExecutorProvider(executorProvider); + builder.setTransportChannelProvider( + new FakeTransportProvider( + FakeTransportChannel.create(new FakeChannel()), null, true, null, null)); + context = ClientContext.create(builder.build()); + transportChannel = (FakeTransportChannel) context.getTransportChannel(); + assertThat(transportChannel.getExecutor()).isSameInstanceAs(executorProvider.getExecutor()); + } } diff --git a/gax/src/test/java/com/google/api/gax/rpc/ClientSettingsTest.java b/gax/src/test/java/com/google/api/gax/rpc/ClientSettingsTest.java index d777f506a7..87a73b1310 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/ClientSettingsTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/ClientSettingsTest.java @@ -111,6 +111,8 @@ public void testEmptyBuilder() throws Exception { FakeClientSettings.Builder builder = new FakeClientSettings.Builder(); Truth.assertThat(builder.getExecutorProvider()) .isInstanceOf(InstantiatingExecutorProvider.class); + Truth.assertThat(builder.getStubSettings().getWorkerExecutorProvider()) + .isSameInstanceAs(builder.getExecutorProvider()); Truth.assertThat(builder.getTransportChannelProvider()).isNull(); Truth.assertThat(builder.getCredentialsProvider()).isInstanceOf(NoCredentialsProvider.class); Truth.assertThat(builder.getClock()).isInstanceOf(NanoClock.class); @@ -124,6 +126,8 @@ public void testEmptyBuilder() throws Exception { FakeClientSettings settings = builder.build(); Truth.assertThat(settings.getExecutorProvider()) .isSameInstanceAs(builder.getExecutorProvider()); + Truth.assertThat(settings.getStubSettings().getWorkerExecutorProvider()) + .isSameInstanceAs(settings.getExecutorProvider()); Truth.assertThat(settings.getTransportChannelProvider()) .isSameInstanceAs(builder.getTransportChannelProvider()); Truth.assertThat(settings.getCredentialsProvider()) @@ -173,6 +177,8 @@ public void testBuilder() throws Exception { builder.setQuotaProjectId(quotaProjectId); Truth.assertThat(builder.getExecutorProvider()).isSameInstanceAs(executorProvider); + Truth.assertThat(builder.getStubSettings().getWorkerExecutorProvider()) + .isSameInstanceAs(executorProvider); Truth.assertThat(builder.getTransportChannelProvider()).isSameInstanceAs(transportProvider); Truth.assertThat(builder.getCredentialsProvider()).isSameInstanceAs(credentialsProvider); Truth.assertThat(builder.getClock()).isSameInstanceAs(clock); @@ -223,6 +229,8 @@ public void testBuilderFromClientContext() throws Exception { FakeClientSettings.Builder builder = new FakeClientSettings.Builder(clientContext); Truth.assertThat(builder.getExecutorProvider()).isInstanceOf(FixedExecutorProvider.class); + Truth.assertThat(builder.getStubSettings().getWorkerExecutorProvider()) + .isSameInstanceAs(builder.getExecutorProvider()); Truth.assertThat(builder.getTransportChannelProvider()) .isInstanceOf(FixedTransportChannelProvider.class); Truth.assertThat(builder.getCredentialsProvider()).isInstanceOf(FixedCredentialsProvider.class); @@ -263,6 +271,8 @@ public void testBuilderFromSettings() throws Exception { FakeClientSettings.Builder newBuilder = new FakeClientSettings.Builder(settings); Truth.assertThat(newBuilder.getExecutorProvider()).isSameInstanceAs(executorProvider); + Truth.assertThat(newBuilder.getStubSettings().getWorkerExecutorProvider()) + .isSameInstanceAs(executorProvider); Truth.assertThat(newBuilder.getTransportChannelProvider()).isSameInstanceAs(transportProvider); Truth.assertThat(newBuilder.getCredentialsProvider()).isSameInstanceAs(credentialsProvider); Truth.assertThat(newBuilder.getClock()).isSameInstanceAs(clock); diff --git a/gax/src/test/java/com/google/api/gax/rpc/testing/FakeTransportChannel.java b/gax/src/test/java/com/google/api/gax/rpc/testing/FakeTransportChannel.java index b6c56a7012..0d4abac8f1 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/testing/FakeTransportChannel.java +++ b/gax/src/test/java/com/google/api/gax/rpc/testing/FakeTransportChannel.java @@ -32,6 +32,7 @@ import com.google.api.core.InternalApi; import com.google.api.gax.rpc.TransportChannel; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @InternalApi("for testing") @@ -39,6 +40,7 @@ public class FakeTransportChannel implements TransportChannel { private final FakeChannel channel; private volatile boolean isShutdown = false; private volatile Map headers; + private volatile Executor executor; private FakeTransportChannel(FakeChannel channel) { this.channel = channel; @@ -102,4 +104,12 @@ public void setHeaders(Map headers) { public Map getHeaders() { return this.headers; } + + public void setExecutor(Executor executor) { + this.executor = executor; + } + + public Executor getExecutor() { + return executor; + } }