diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java index 57b0ff1284..c9dd85442e 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java @@ -186,6 +186,8 @@ private HttpJsonTransportChannel createChannel() throws IOException, GeneralSecu httpTransportToUse = createHttpTransport(); } + // Pass the executor to the ManagedChannel. If no executor was provided (or null), + // the channel will use a default executor for the calls. ManagedHttpJsonChannel channel = ManagedHttpJsonChannel.newBuilder() .setEndpoint(endpoint) diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java index e6fe38d5ef..22e333752a 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java @@ -34,6 +34,7 @@ import com.google.api.core.BetaApi; import com.google.api.gax.core.BackgroundResource; import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.IOException; import java.util.concurrent.Executor; @@ -47,23 +48,24 @@ @BetaApi public class ManagedHttpJsonChannel implements HttpJsonChannel, BackgroundResource { - private static final ExecutorService DEFAULT_EXECUTOR = - InstantiatingExecutorProvider.newBuilder().build().getExecutor(); - private final Executor executor; + private final boolean usingDefaultExecutor; private final String endpoint; private final HttpTransport httpTransport; private final ScheduledExecutorService deadlineScheduledExecutorService; - private boolean isTransportShutdown; protected ManagedHttpJsonChannel() { - this(null, null, null); + this(null, true, null, null); } private ManagedHttpJsonChannel( - Executor executor, String endpoint, @Nullable HttpTransport httpTransport) { + Executor executor, + boolean usingDefaultExecutor, + String endpoint, + @Nullable HttpTransport httpTransport) { this.executor = executor; + this.usingDefaultExecutor = usingDefaultExecutor; this.endpoint = endpoint; this.httpTransport = httpTransport == null ? new NetHttpTransport() : httpTransport; this.deadlineScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); @@ -82,46 +84,105 @@ public HttpJsonClientCall newCall( deadlineScheduledExecutorService); } + @VisibleForTesting + Executor getExecutor() { + return executor; + } + @Override public synchronized void shutdown() { + // Calling shutdown/ shutdownNow() twice should no-op if (isTransportShutdown) { return; } try { + // Only shutdown the executor if it was created by Gax. External executors + // should be managed by the user. + if (shouldManageExecutor()) { + ((ExecutorService) executor).shutdown(); + } deadlineScheduledExecutorService.shutdown(); httpTransport.shutdown(); isTransportShutdown = true; } catch (IOException e) { - e.printStackTrace(); + // TODO: Log this scenario once we implemented the Cloud SDK logging. + // Swallow error if httpTransport shutdown fails } } @Override public boolean isShutdown() { - return isTransportShutdown; + // TODO(lawrenceqiu): Expose an isShutdown() method for HttpTransport + boolean isShutdown = isTransportShutdown && deadlineScheduledExecutorService.isShutdown(); + // Check that the Gax's ExecutorService is shutdown as well + if (shouldManageExecutor()) { + isShutdown = isShutdown && ((ExecutorService) executor).isShutdown(); + } + return isShutdown; } @Override public boolean isTerminated() { - return isTransportShutdown; + boolean isTerminated = deadlineScheduledExecutorService.isTerminated(); + // Check that the Gax's ExecutorService is terminated as well + if (shouldManageExecutor()) { + isTerminated = isTerminated && ((ExecutorService) executor).isTerminated(); + } + return isTerminated; } @Override public void shutdownNow() { - shutdown(); + // Calling shutdown/ shutdownNow() twice should no-op + if (isTransportShutdown) { + return; + } + try { + // Only shutdown the executor if it was created by Gax. External executors + // should be managed by the user. + if (shouldManageExecutor()) { + ((ExecutorService) executor).shutdownNow(); + } + deadlineScheduledExecutorService.shutdownNow(); + httpTransport.shutdown(); + isTransportShutdown = true; + } catch (IOException e) { + // TODO: Log this scenario once we implemented the Cloud SDK logging. + // Swallow error if httpTransport shutdown fails + } } @Override public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { - // TODO - return false; + long endTimeNanos = System.nanoTime() + unit.toNanos(duration); + long awaitTimeNanos = endTimeNanos - System.nanoTime(); + if (awaitTimeNanos <= 0) { + return false; + } + // Only awaitTermination for the executor if it was created by Gax. External executors + // should be managed by the user. + if (usingDefaultExecutor && executor instanceof ExecutorService) { + boolean terminated = ((ExecutorService) executor).awaitTermination(awaitTimeNanos, unit); + // Termination duration has elapsed + if (!terminated) { + return false; + } + } + awaitTimeNanos = endTimeNanos - System.nanoTime(); + return deadlineScheduledExecutorService.awaitTermination(awaitTimeNanos, unit); + } + + private boolean shouldManageExecutor() { + return usingDefaultExecutor && executor instanceof ExecutorService; } @Override - public void close() {} + public void close() { + shutdown(); + } public static Builder newBuilder() { - return new Builder().setExecutor(DEFAULT_EXECUTOR); + return new Builder(); } public static class Builder { @@ -129,11 +190,14 @@ public static class Builder { private Executor executor; private String endpoint; private HttpTransport httpTransport; + private boolean usingDefaultExecutor; - private Builder() {} + private Builder() { + this.usingDefaultExecutor = false; + } public Builder setExecutor(Executor executor) { - this.executor = executor == null ? DEFAULT_EXECUTOR : executor; + this.executor = executor; return this; } @@ -150,8 +214,20 @@ public Builder setHttpTransport(HttpTransport httpTransport) { public ManagedHttpJsonChannel build() { Preconditions.checkNotNull(endpoint); + // If the executor provided for this channel is null, gax will provide a + // default executor to used for the calls. Only the default executor's + // lifecycle will be managed by the channel. Any external executor needs to + // managed by the user. + if (executor == null) { + executor = InstantiatingExecutorProvider.newIOBuilder().build().getExecutor(); + usingDefaultExecutor = true; + } + return new ManagedHttpJsonChannel( - executor, endpoint, httpTransport == null ? new NetHttpTransport() : httpTransport); + executor, + usingDefaultExecutor, + endpoint, + httpTransport == null ? new NetHttpTransport() : httpTransport); } } } diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonInterceptorChannel.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonInterceptorChannel.java index 05321e2fd7..56ab5a5abb 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonInterceptorChannel.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonInterceptorChannel.java @@ -30,6 +30,7 @@ package com.google.api.gax.httpjson; import com.google.api.core.BetaApi; +import com.google.common.annotations.VisibleForTesting; import java.util.concurrent.TimeUnit; @BetaApi @@ -45,6 +46,11 @@ class ManagedHttpJsonInterceptorChannel extends ManagedHttpJsonChannel { this.interceptor = interceptor; } + @VisibleForTesting + ManagedHttpJsonChannel getChannel() { + return channel; + } + @Override public HttpJsonClientCall newCall( ApiMethodDescriptor methodDescriptor, HttpJsonCallOptions callOptions) { diff --git a/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java index 4061ff8bad..416fcfb2da 100644 --- a/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java +++ b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java @@ -39,6 +39,7 @@ import java.io.IOException; import java.security.GeneralSecurityException; import java.util.Collections; +import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -50,20 +51,22 @@ @RunWith(JUnit4.class) public class InstantiatingHttpJsonChannelProviderTest extends AbstractMtlsTransportChannelTest { + private static final String DEFAULT_ENDPOINT = "localhost:8080"; + private static final Map DEFAULT_HEADER_MAP = Collections.emptyMap(); + @Test public void basicTest() throws IOException { - String endpoint = "localhost:8080"; ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1); executor.shutdown(); TransportChannelProvider provider = InstantiatingHttpJsonChannelProvider.newBuilder().build(); assertThat(provider.needsEndpoint()).isTrue(); - provider = provider.withEndpoint(endpoint); + provider = provider.withEndpoint(DEFAULT_ENDPOINT); assertThat(provider.needsEndpoint()).isFalse(); assertThat(provider.needsHeaders()).isTrue(); - provider = provider.withHeaders(Collections.emptyMap()); + provider = provider.withHeaders(DEFAULT_HEADER_MAP); assertThat(provider.needsHeaders()).isFalse(); // Make sure getTransportChannel works without setting executor @@ -103,6 +106,57 @@ public void basicTest() throws IOException { provider.getTransportChannel().shutdownNow(); } + // Ensure that a default executor is created by the ManagedHttpJsonChannel even + // if not provided by the TransportChannelProvider + @Test + public void managedChannelUsesDefaultChannelExecutor() throws IOException { + InstantiatingHttpJsonChannelProvider instantiatingHttpJsonChannelProvider = + InstantiatingHttpJsonChannelProvider.newBuilder().setEndpoint(DEFAULT_ENDPOINT).build(); + instantiatingHttpJsonChannelProvider = + (InstantiatingHttpJsonChannelProvider) + instantiatingHttpJsonChannelProvider.withHeaders(DEFAULT_HEADER_MAP); + HttpJsonTransportChannel httpJsonTransportChannel = + instantiatingHttpJsonChannelProvider.getTransportChannel(); + + // By default, the channel will be wrapped with ManagedHttpJsonInterceptorChannel + ManagedHttpJsonInterceptorChannel interceptorChannel = + (ManagedHttpJsonInterceptorChannel) httpJsonTransportChannel.getManagedChannel(); + ManagedHttpJsonChannel managedHttpJsonChannel = interceptorChannel.getChannel(); + assertThat(managedHttpJsonChannel.getExecutor()).isNotNull(); + + // Clean up the resources (executor, deadlineScheduler, httpTransport) + instantiatingHttpJsonChannelProvider.getTransportChannel().shutdownNow(); + } + + // Ensure that the user's executor is used by the ManagedHttpJsonChannel + @Test + public void managedChannelUsesCustomExecutor() throws IOException { + // Custom executor to use -- Lifecycle must be managed by this test + ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1); + executor.shutdown(); + + InstantiatingHttpJsonChannelProvider instantiatingHttpJsonChannelProvider = + InstantiatingHttpJsonChannelProvider.newBuilder() + .setEndpoint(DEFAULT_ENDPOINT) + .setExecutor(executor) + .build(); + instantiatingHttpJsonChannelProvider = + (InstantiatingHttpJsonChannelProvider) + instantiatingHttpJsonChannelProvider.withHeaders(DEFAULT_HEADER_MAP); + HttpJsonTransportChannel httpJsonTransportChannel = + instantiatingHttpJsonChannelProvider.getTransportChannel(); + + // By default, the channel will be wrapped with ManagedHttpJsonInterceptorChannel + ManagedHttpJsonInterceptorChannel interceptorChannel = + (ManagedHttpJsonInterceptorChannel) httpJsonTransportChannel.getManagedChannel(); + ManagedHttpJsonChannel managedHttpJsonChannel = interceptorChannel.getChannel(); + assertThat(managedHttpJsonChannel.getExecutor()).isNotNull(); + assertThat(managedHttpJsonChannel.getExecutor()).isEqualTo(executor); + + // Clean up the resources (executor, deadlineScheduler, httpTransport) + instantiatingHttpJsonChannelProvider.getTransportChannel().shutdownNow(); + } + @Override protected Object getMtlsObjectFromTransportChannel(MtlsProvider provider) throws IOException, GeneralSecurityException { diff --git a/gax-java/gax/src/main/java/com/google/api/gax/core/InstantiatingExecutorProvider.java b/gax-java/gax/src/main/java/com/google/api/gax/core/InstantiatingExecutorProvider.java index c91b044cb9..1faee91a4a 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/core/InstantiatingExecutorProvider.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/core/InstantiatingExecutorProvider.java @@ -54,6 +54,9 @@ public Thread newThread(Runnable runnable) { return thread; } }; + private static final int MIN_THREAD_AMOUNT = 4; + // Attempt to choose a reasonable default core pool multiplier for IO Bound operations + private static final int IO_THREAD_MULTIPLIER = 50; // Package-private constructor prevents others from subclassing. InstantiatingExecutorProvider() {} @@ -76,9 +79,22 @@ public boolean shouldAutoClose() { public abstract Builder toBuilder(); + // Used for CPU Bound tasks as the thread count is at max the number of processors + // Thread count minimum is at least `MIN_CPU_AMOUNT` public static Builder newBuilder() { int numCpus = Runtime.getRuntime().availableProcessors(); - int numThreads = Math.max(4, numCpus); + int numThreads = Math.max(MIN_THREAD_AMOUNT, numCpus); + + return new AutoValue_InstantiatingExecutorProvider.Builder() + .setExecutorThreadCount(numThreads) + .setThreadFactory(DEFAULT_THREAD_FACTORY); + } + + // Used for IO Bound tasks as the thread count scales with the number of processors + // Thread count minimum is at least `MIN_CPU_AMOUNT` * `IO_THREAD_MULTIPLIER` + public static Builder newIOBuilder() { + int numCpus = Runtime.getRuntime().availableProcessors(); + int numThreads = IO_THREAD_MULTIPLIER * Math.max(MIN_THREAD_AMOUNT, numCpus); return new AutoValue_InstantiatingExecutorProvider.Builder() .setExecutorThreadCount(numThreads)