From a16bc6a6b4b250091b7fa5245fc087fc0f3c684e Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Fri, 12 May 2023 14:28:44 -0400 Subject: [PATCH 01/13] feat: Implement awaitTermination() for MangedHttpJsonChannel --- .../gax/httpjson/ManagedHttpJsonChannel.java | 66 +++++++++++++++---- 1 file changed, 52 insertions(+), 14 deletions(-) diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java index e6fe38d5ef..21e553c7c5 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java @@ -47,23 +47,24 @@ @BetaApi public class ManagedHttpJsonChannel implements HttpJsonChannel, BackgroundResource { - private static final ExecutorService DEFAULT_EXECUTOR = - InstantiatingExecutorProvider.newBuilder().build().getExecutor(); - private final Executor executor; + private final boolean usingDefaultExecutor; private final String endpoint; private final HttpTransport httpTransport; private final ScheduledExecutorService deadlineScheduledExecutorService; - private boolean isTransportShutdown; protected ManagedHttpJsonChannel() { - this(null, null, null); + this(null, true, null, null); } private ManagedHttpJsonChannel( - Executor executor, String endpoint, @Nullable HttpTransport httpTransport) { + Executor executor, + boolean usingDefaultExecutor, + String endpoint, + @Nullable HttpTransport httpTransport) { this.executor = executor; + this.usingDefaultExecutor = usingDefaultExecutor; this.endpoint = endpoint; this.httpTransport = httpTransport == null ? new NetHttpTransport() : httpTransport; this.deadlineScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); @@ -84,10 +85,16 @@ public HttpJsonClientCall newCall( @Override public synchronized void shutdown() { + // Calling shutdown() twice should no-op if (isTransportShutdown) { return; } try { + // Only shutdown the executor if it was created by Gax. External executors + // should be managed by the user. + if (usingDefaultExecutor) { + ((ExecutorService) executor).shutdown(); + } deadlineScheduledExecutorService.shutdown(); httpTransport.shutdown(); isTransportShutdown = true; @@ -98,12 +105,22 @@ public synchronized void shutdown() { @Override public boolean isShutdown() { - return isTransportShutdown; + boolean isShutdown = deadlineScheduledExecutorService.isShutdown(); + // Check that the Gax's ExecutorService is shutdown as well + if (usingDefaultExecutor) { + isShutdown = isShutdown && ((ExecutorService) executor).isShutdown(); + } + return isShutdown; } @Override public boolean isTerminated() { - return isTransportShutdown; + boolean isTerminated = deadlineScheduledExecutorService.isTerminated(); + // Check that the Gax's ExecutorService is terminated as well + if (usingDefaultExecutor) { + isTerminated = isTerminated && ((ExecutorService) executor).isTerminated(); + } + return isTerminated; } @Override @@ -113,15 +130,28 @@ public void shutdownNow() { @Override public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { - // TODO - return false; + long endTimeNanos = System.nanoTime() + unit.toNanos(duration); + long awaitTimeNanos = endTimeNanos - System.nanoTime(); + // Only awaitTermination for the executor if it was created by Gax. External executors + // should be managed by the user. + if (usingDefaultExecutor && awaitTimeNanos > 0) { + boolean terminated = ((ExecutorService) executor).awaitTermination(awaitTimeNanos, unit); + // Termination duration has elapsed + if (!terminated) { + return false; + } + } + awaitTimeNanos = endTimeNanos - System.nanoTime(); + return deadlineScheduledExecutorService.awaitTermination(awaitTimeNanos, unit); } @Override - public void close() {} + public void close() { + shutdown(); + } public static Builder newBuilder() { - return new Builder().setExecutor(DEFAULT_EXECUTOR); + return new Builder(); } public static class Builder { @@ -133,7 +163,7 @@ public static class Builder { private Builder() {} public Builder setExecutor(Executor executor) { - this.executor = executor == null ? DEFAULT_EXECUTOR : executor; + this.executor = executor; return this; } @@ -150,8 +180,16 @@ public Builder setHttpTransport(HttpTransport httpTransport) { public ManagedHttpJsonChannel build() { Preconditions.checkNotNull(endpoint); + boolean usingDefaultExecutor = executor == null; + if (usingDefaultExecutor) { + executor = InstantiatingExecutorProvider.newBuilder().build().getExecutor(); + } + return new ManagedHttpJsonChannel( - executor, endpoint, httpTransport == null ? new NetHttpTransport() : httpTransport); + executor, + usingDefaultExecutor, + endpoint, + httpTransport == null ? new NetHttpTransport() : httpTransport); } } } From 2eb4f6934aae0fc6efd557ee1c60a796fb0e0795 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Fri, 12 May 2023 14:53:26 -0400 Subject: [PATCH 02/13] chore: Check if there is time to await first --- .../com/google/api/gax/httpjson/ManagedHttpJsonChannel.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java index 21e553c7c5..43ee74efe7 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java @@ -132,9 +132,12 @@ public void shutdownNow() { public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { long endTimeNanos = System.nanoTime() + unit.toNanos(duration); long awaitTimeNanos = endTimeNanos - System.nanoTime(); + if (awaitTimeNanos <= 0) { + return false; + } // Only awaitTermination for the executor if it was created by Gax. External executors // should be managed by the user. - if (usingDefaultExecutor && awaitTimeNanos > 0) { + if (usingDefaultExecutor) { boolean terminated = ((ExecutorService) executor).awaitTermination(awaitTimeNanos, unit); // Termination duration has elapsed if (!terminated) { From 08632ed24a55a640a0e02c80ef56c3ba9bfd4a4e Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Mon, 15 May 2023 13:43:43 -0400 Subject: [PATCH 03/13] chore: Check that transport is shutdown as well --- .../com/google/api/gax/httpjson/ManagedHttpJsonChannel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java index 43ee74efe7..aee333158d 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java @@ -105,7 +105,7 @@ public synchronized void shutdown() { @Override public boolean isShutdown() { - boolean isShutdown = deadlineScheduledExecutorService.isShutdown(); + boolean isShutdown = isTransportShutdown && deadlineScheduledExecutorService.isShutdown(); // Check that the Gax's ExecutorService is shutdown as well if (usingDefaultExecutor) { isShutdown = isShutdown && ((ExecutorService) executor).isShutdown(); From 7eaad4c5f5dd37415209831c8332e815d2e438c2 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Thu, 18 May 2023 10:08:17 -0400 Subject: [PATCH 04/13] chore: Add comments for executor behavior --- .../gax/httpjson/InstantiatingHttpJsonChannelProvider.java | 2 ++ .../com/google/api/gax/httpjson/ManagedHttpJsonChannel.java | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java index 57b0ff1284..c9dd85442e 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java @@ -186,6 +186,8 @@ private HttpJsonTransportChannel createChannel() throws IOException, GeneralSecu httpTransportToUse = createHttpTransport(); } + // Pass the executor to the ManagedChannel. If no executor was provided (or null), + // the channel will use a default executor for the calls. ManagedHttpJsonChannel channel = ManagedHttpJsonChannel.newBuilder() .setEndpoint(endpoint) diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java index aee333158d..7d59a51397 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java @@ -183,6 +183,10 @@ public Builder setHttpTransport(HttpTransport httpTransport) { public ManagedHttpJsonChannel build() { Preconditions.checkNotNull(endpoint); + // If the executor provided for this channel is null, gax will provide a + // default executor to used for the calls. Only the default executor's + // lifecycle will be managed by the channel. Any external executor needs to + // managed by the user. boolean usingDefaultExecutor = executor == null; if (usingDefaultExecutor) { executor = InstantiatingExecutorProvider.newBuilder().build().getExecutor(); From 40c6d272d08385d80fcded7b8a2d35087a57fd0e Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Thu, 18 May 2023 17:02:19 -0400 Subject: [PATCH 05/13] chore: Add type cast for default executor --- .../google/api/gax/httpjson/ManagedHttpJsonChannel.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java index 7d59a51397..98da25a33f 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java @@ -92,7 +92,7 @@ public synchronized void shutdown() { try { // Only shutdown the executor if it was created by Gax. External executors // should be managed by the user. - if (usingDefaultExecutor) { + if (usingDefaultExecutor && executor instanceof ExecutorService) { ((ExecutorService) executor).shutdown(); } deadlineScheduledExecutorService.shutdown(); @@ -105,9 +105,10 @@ public synchronized void shutdown() { @Override public boolean isShutdown() { + // TODO(lawrenceqiu): Expose an isShutdown() method for HttpTransport boolean isShutdown = isTransportShutdown && deadlineScheduledExecutorService.isShutdown(); // Check that the Gax's ExecutorService is shutdown as well - if (usingDefaultExecutor) { + if (usingDefaultExecutor && executor instanceof ExecutorService) { isShutdown = isShutdown && ((ExecutorService) executor).isShutdown(); } return isShutdown; @@ -117,7 +118,7 @@ public boolean isShutdown() { public boolean isTerminated() { boolean isTerminated = deadlineScheduledExecutorService.isTerminated(); // Check that the Gax's ExecutorService is terminated as well - if (usingDefaultExecutor) { + if (usingDefaultExecutor && executor instanceof ExecutorService) { isTerminated = isTerminated && ((ExecutorService) executor).isTerminated(); } return isTerminated; @@ -137,7 +138,7 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted } // Only awaitTermination for the executor if it was created by Gax. External executors // should be managed by the user. - if (usingDefaultExecutor) { + if (usingDefaultExecutor && executor instanceof ExecutorService) { boolean terminated = ((ExecutorService) executor).awaitTermination(awaitTimeNanos, unit); // Termination duration has elapsed if (!terminated) { From 04d3ecc96059d47eebf38184debc8148fe02bd3e Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Thu, 18 May 2023 17:08:36 -0400 Subject: [PATCH 06/13] chore: Implement shutdownNow() --- .../gax/httpjson/ManagedHttpJsonChannel.java | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java index 98da25a33f..a42de0d427 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java @@ -85,7 +85,7 @@ public HttpJsonClientCall newCall( @Override public synchronized void shutdown() { - // Calling shutdown() twice should no-op + // Calling shutdown/ shutdownNow() twice should no-op if (isTransportShutdown) { return; } @@ -126,7 +126,22 @@ public boolean isTerminated() { @Override public void shutdownNow() { - shutdown(); + // Calling shutdown/ shutdownNow() twice should no-op + if (isTransportShutdown) { + return; + } + try { + // Only shutdown the executor if it was created by Gax. External executors + // should be managed by the user. + if (usingDefaultExecutor && executor instanceof ExecutorService) { + ((ExecutorService) executor).shutdownNow(); + } + deadlineScheduledExecutorService.shutdownNow(); + httpTransport.shutdown(); + isTransportShutdown = true; + } catch (IOException e) { + e.printStackTrace(); + } } @Override @@ -163,8 +178,11 @@ public static class Builder { private Executor executor; private String endpoint; private HttpTransport httpTransport; + private boolean usingDefaultExecutor; - private Builder() {} + private Builder() { + this.usingDefaultExecutor = false; + } public Builder setExecutor(Executor executor) { this.executor = executor; @@ -188,9 +206,9 @@ public ManagedHttpJsonChannel build() { // default executor to used for the calls. Only the default executor's // lifecycle will be managed by the channel. Any external executor needs to // managed by the user. - boolean usingDefaultExecutor = executor == null; - if (usingDefaultExecutor) { + if (executor == null) { executor = InstantiatingExecutorProvider.newBuilder().build().getExecutor(); + usingDefaultExecutor = true; } return new ManagedHttpJsonChannel( From 25992500a9e0346647ada412ab9eef653b4c498f Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Thu, 18 May 2023 17:37:54 -0400 Subject: [PATCH 07/13] chore: Add a tests InstantiatingHttpJsonChannelProviderTest for the channel logic --- .../gax/httpjson/ManagedHttpJsonChannel.java | 6 +++ .../ManagedHttpJsonInterceptorChannel.java | 6 +++ ...tantiatingHttpJsonChannelProviderTest.java | 49 +++++++++++++++++-- 3 files changed, 58 insertions(+), 3 deletions(-) diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java index a42de0d427..7f8a52d629 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java @@ -34,6 +34,7 @@ import com.google.api.core.BetaApi; import com.google.api.gax.core.BackgroundResource; import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.IOException; import java.util.concurrent.Executor; @@ -83,6 +84,11 @@ public HttpJsonClientCall newCall( deadlineScheduledExecutorService); } + @VisibleForTesting + Executor getExecutor() { + return executor; + } + @Override public synchronized void shutdown() { // Calling shutdown/ shutdownNow() twice should no-op diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonInterceptorChannel.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonInterceptorChannel.java index 05321e2fd7..56ab5a5abb 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonInterceptorChannel.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonInterceptorChannel.java @@ -30,6 +30,7 @@ package com.google.api.gax.httpjson; import com.google.api.core.BetaApi; +import com.google.common.annotations.VisibleForTesting; import java.util.concurrent.TimeUnit; @BetaApi @@ -45,6 +46,11 @@ class ManagedHttpJsonInterceptorChannel extends ManagedHttpJsonChannel { this.interceptor = interceptor; } + @VisibleForTesting + ManagedHttpJsonChannel getChannel() { + return channel; + } + @Override public HttpJsonClientCall newCall( ApiMethodDescriptor methodDescriptor, HttpJsonCallOptions callOptions) { diff --git a/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java index 4061ff8bad..3b751fcedf 100644 --- a/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java +++ b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java @@ -39,6 +39,7 @@ import java.io.IOException; import java.security.GeneralSecurityException; import java.util.Collections; +import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -50,20 +51,22 @@ @RunWith(JUnit4.class) public class InstantiatingHttpJsonChannelProviderTest extends AbstractMtlsTransportChannelTest { + private static final String DEFAULT_ENDPOINT = "localhost:8080"; + private static final Map DEFAULT_HEADER_MAP = Collections.emptyMap(); + @Test public void basicTest() throws IOException { - String endpoint = "localhost:8080"; ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1); executor.shutdown(); TransportChannelProvider provider = InstantiatingHttpJsonChannelProvider.newBuilder().build(); assertThat(provider.needsEndpoint()).isTrue(); - provider = provider.withEndpoint(endpoint); + provider = provider.withEndpoint(DEFAULT_ENDPOINT); assertThat(provider.needsEndpoint()).isFalse(); assertThat(provider.needsHeaders()).isTrue(); - provider = provider.withHeaders(Collections.emptyMap()); + provider = provider.withHeaders(DEFAULT_HEADER_MAP); assertThat(provider.needsHeaders()).isFalse(); // Make sure getTransportChannel works without setting executor @@ -103,6 +106,46 @@ public void basicTest() throws IOException { provider.getTransportChannel().shutdownNow(); } + // Ensure that a default executor is created by the ManagedHttpJsonChannel even + // if not provided by the TransportChannelProvider + @Test + public void managedChannelUsesDefaultChannelExecutor() throws IOException { + InstantiatingHttpJsonChannelProvider instantiatingHttpJsonChannelProvider = + InstantiatingHttpJsonChannelProvider.newBuilder().setEndpoint(DEFAULT_ENDPOINT).build(); + instantiatingHttpJsonChannelProvider = + (InstantiatingHttpJsonChannelProvider) + instantiatingHttpJsonChannelProvider.withHeaders(DEFAULT_HEADER_MAP); + HttpJsonTransportChannel httpJsonTransportChannel = + instantiatingHttpJsonChannelProvider.getTransportChannel(); + ManagedHttpJsonInterceptorChannel interceptorChannel = + (ManagedHttpJsonInterceptorChannel) httpJsonTransportChannel.getManagedChannel(); + ManagedHttpJsonChannel managedHttpJsonChannel = interceptorChannel.getChannel(); + assertThat(managedHttpJsonChannel.getExecutor()).isNotNull(); + } + + // Ensure that the user's executor is used by the ManagedHttpJsonChannel + @Test + public void managedChannelUsesCustomExecutor() throws IOException { + ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1); + executor.shutdown(); + + InstantiatingHttpJsonChannelProvider instantiatingHttpJsonChannelProvider = + InstantiatingHttpJsonChannelProvider.newBuilder() + .setEndpoint(DEFAULT_ENDPOINT) + .setExecutor(executor) + .build(); + instantiatingHttpJsonChannelProvider = + (InstantiatingHttpJsonChannelProvider) + instantiatingHttpJsonChannelProvider.withHeaders(DEFAULT_HEADER_MAP); + HttpJsonTransportChannel httpJsonTransportChannel = + instantiatingHttpJsonChannelProvider.getTransportChannel(); + ManagedHttpJsonInterceptorChannel interceptorChannel = + (ManagedHttpJsonInterceptorChannel) httpJsonTransportChannel.getManagedChannel(); + ManagedHttpJsonChannel managedHttpJsonChannel = interceptorChannel.getChannel(); + assertThat(managedHttpJsonChannel.getExecutor()).isNotNull(); + assertThat(managedHttpJsonChannel.getExecutor()).isEqualTo(executor); + } + @Override protected Object getMtlsObjectFromTransportChannel(MtlsProvider provider) throws IOException, GeneralSecurityException { From df1bab0bb9139774b6d6a39842e27f0d23a6a356 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Fri, 19 May 2023 10:30:34 -0400 Subject: [PATCH 08/13] chore: Add comments for the tests --- .../InstantiatingHttpJsonChannelProviderTest.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java index 3b751fcedf..52e6ddefed 100644 --- a/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java +++ b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java @@ -117,17 +117,22 @@ public void managedChannelUsesDefaultChannelExecutor() throws IOException { instantiatingHttpJsonChannelProvider.withHeaders(DEFAULT_HEADER_MAP); HttpJsonTransportChannel httpJsonTransportChannel = instantiatingHttpJsonChannelProvider.getTransportChannel(); + + // By default, the channel will be wrapped with ManagedHttpJsonInterceptorChannel ManagedHttpJsonInterceptorChannel interceptorChannel = (ManagedHttpJsonInterceptorChannel) httpJsonTransportChannel.getManagedChannel(); ManagedHttpJsonChannel managedHttpJsonChannel = interceptorChannel.getChannel(); assertThat(managedHttpJsonChannel.getExecutor()).isNotNull(); + + // Clean up the resources (executor, deadlineScheduler, httpTransport) + instantiatingHttpJsonChannelProvider.getTransportChannel().shutdownNow(); } // Ensure that the user's executor is used by the ManagedHttpJsonChannel @Test public void managedChannelUsesCustomExecutor() throws IOException { + // Custom executor to use ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1); - executor.shutdown(); InstantiatingHttpJsonChannelProvider instantiatingHttpJsonChannelProvider = InstantiatingHttpJsonChannelProvider.newBuilder() @@ -139,11 +144,16 @@ public void managedChannelUsesCustomExecutor() throws IOException { instantiatingHttpJsonChannelProvider.withHeaders(DEFAULT_HEADER_MAP); HttpJsonTransportChannel httpJsonTransportChannel = instantiatingHttpJsonChannelProvider.getTransportChannel(); + + // By default, the channel will be wrapped with ManagedHttpJsonInterceptorChannel ManagedHttpJsonInterceptorChannel interceptorChannel = (ManagedHttpJsonInterceptorChannel) httpJsonTransportChannel.getManagedChannel(); ManagedHttpJsonChannel managedHttpJsonChannel = interceptorChannel.getChannel(); assertThat(managedHttpJsonChannel.getExecutor()).isNotNull(); assertThat(managedHttpJsonChannel.getExecutor()).isEqualTo(executor); + + // Clean up the resources (executor, deadlineScheduler, httpTransport) + instantiatingHttpJsonChannelProvider.getTransportChannel().shutdownNow(); } @Override From ece7a723e17f4bb3f7913ec788e679403dbda8b7 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Wed, 31 May 2023 12:33:01 -0400 Subject: [PATCH 09/13] chore: Add a default multiplier for IO tasks --- .../gax/httpjson/ManagedHttpJsonChannel.java | 2 +- .../core/InstantiatingExecutorProvider.java | 19 ++++++++++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java index 7f8a52d629..f156a7d182 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java @@ -213,7 +213,7 @@ public ManagedHttpJsonChannel build() { // lifecycle will be managed by the channel. Any external executor needs to // managed by the user. if (executor == null) { - executor = InstantiatingExecutorProvider.newBuilder().build().getExecutor(); + executor = InstantiatingExecutorProvider.newIOBuilder().build().getExecutor(); usingDefaultExecutor = true; } diff --git a/gax-java/gax/src/main/java/com/google/api/gax/core/InstantiatingExecutorProvider.java b/gax-java/gax/src/main/java/com/google/api/gax/core/InstantiatingExecutorProvider.java index c91b044cb9..206315d078 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/core/InstantiatingExecutorProvider.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/core/InstantiatingExecutorProvider.java @@ -54,6 +54,9 @@ public Thread newThread(Runnable runnable) { return thread; } }; + private static final int MIN_THREAD_COUNT = 4; + // Attempt to choose a reasonable default core pool multiplier for IO Bound operations + private static final int IO_THREAD_MULTIPLIER = 50; // Package-private constructor prevents others from subclassing. InstantiatingExecutorProvider() {} @@ -76,9 +79,23 @@ public boolean shouldAutoClose() { public abstract Builder toBuilder(); + // Used for CPU Bound tasks as the thread count is at max the number of processors + // Thread count is at a minimum of `MIN_THREAD_COUNT` public static Builder newBuilder() { int numCpus = Runtime.getRuntime().availableProcessors(); - int numThreads = Math.max(4, numCpus); + int numThreads = Math.max(MIN_THREAD_COUNT, numCpus); + + return new AutoValue_InstantiatingExecutorProvider.Builder() + .setExecutorThreadCount(numThreads) + .setThreadFactory(DEFAULT_THREAD_FACTORY); + } + + // Used for IO Bound tasks as the thread count scales with the number of processors + // Thread count is at a minimum of `MIN_THREAD_COUNT` * `IO_THREAD_MULTIPLIER` + public static Builder newIOBuilder() { + int numCpus = Runtime.getRuntime().availableProcessors(); + int numThreads = + Math.max(MIN_THREAD_COUNT * IO_THREAD_MULTIPLIER, numCpus * IO_THREAD_MULTIPLIER); return new AutoValue_InstantiatingExecutorProvider.Builder() .setExecutorThreadCount(numThreads) From f530462fec24fd4e33b34f4363222cc854d22911 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Thu, 1 Jun 2023 13:13:05 -0400 Subject: [PATCH 10/13] chore: Address code smell --- .../com/google/api/gax/httpjson/ManagedHttpJsonChannel.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java index f156a7d182..abc8c8036b 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java @@ -105,7 +105,8 @@ public synchronized void shutdown() { httpTransport.shutdown(); isTransportShutdown = true; } catch (IOException e) { - e.printStackTrace(); + // TODO: Log this scenario once we implemented the Cloud SDK logging. + // Swallow error if httpTransport shutdown fails } } @@ -146,7 +147,8 @@ public void shutdownNow() { httpTransport.shutdown(); isTransportShutdown = true; } catch (IOException e) { - e.printStackTrace(); + // TODO: Log this scenario once we implemented the Cloud SDK logging. + // Swallow error if httpTransport shutdown fails } } From 381975ae7becedd9d6aa102da5906a11608614c9 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Thu, 1 Jun 2023 16:10:14 -0400 Subject: [PATCH 11/13] chore: Clean up code --- .../api/gax/core/InstantiatingExecutorProvider.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/gax-java/gax/src/main/java/com/google/api/gax/core/InstantiatingExecutorProvider.java b/gax-java/gax/src/main/java/com/google/api/gax/core/InstantiatingExecutorProvider.java index 206315d078..e1670df879 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/core/InstantiatingExecutorProvider.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/core/InstantiatingExecutorProvider.java @@ -54,7 +54,7 @@ public Thread newThread(Runnable runnable) { return thread; } }; - private static final int MIN_THREAD_COUNT = 4; + private static final int MIN_CPU_AMOUNT = 4; // Attempt to choose a reasonable default core pool multiplier for IO Bound operations private static final int IO_THREAD_MULTIPLIER = 50; @@ -80,10 +80,10 @@ public boolean shouldAutoClose() { public abstract Builder toBuilder(); // Used for CPU Bound tasks as the thread count is at max the number of processors - // Thread count is at a minimum of `MIN_THREAD_COUNT` + // Thread count minimum is at least `MIN_CPU_AMOUNT` public static Builder newBuilder() { int numCpus = Runtime.getRuntime().availableProcessors(); - int numThreads = Math.max(MIN_THREAD_COUNT, numCpus); + int numThreads = Math.max(MIN_CPU_AMOUNT, numCpus); return new AutoValue_InstantiatingExecutorProvider.Builder() .setExecutorThreadCount(numThreads) @@ -91,11 +91,10 @@ public static Builder newBuilder() { } // Used for IO Bound tasks as the thread count scales with the number of processors - // Thread count is at a minimum of `MIN_THREAD_COUNT` * `IO_THREAD_MULTIPLIER` + // Thread count minimum is at least `MIN_CPU_AMOUNT` * `IO_THREAD_MULTIPLIER` public static Builder newIOBuilder() { int numCpus = Runtime.getRuntime().availableProcessors(); - int numThreads = - Math.max(MIN_THREAD_COUNT * IO_THREAD_MULTIPLIER, numCpus * IO_THREAD_MULTIPLIER); + int numThreads = IO_THREAD_MULTIPLIER * Math.max(MIN_CPU_AMOUNT, numCpus); return new AutoValue_InstantiatingExecutorProvider.Builder() .setExecutorThreadCount(numThreads) From 41b30603e67688c3274b136d6c69b32e0f7f40e0 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Fri, 2 Jun 2023 14:39:39 -0400 Subject: [PATCH 12/13] chore: Shutdown executor in test --- .../gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java index 52e6ddefed..416fcfb2da 100644 --- a/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java +++ b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java @@ -131,8 +131,9 @@ public void managedChannelUsesDefaultChannelExecutor() throws IOException { // Ensure that the user's executor is used by the ManagedHttpJsonChannel @Test public void managedChannelUsesCustomExecutor() throws IOException { - // Custom executor to use + // Custom executor to use -- Lifecycle must be managed by this test ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1); + executor.shutdown(); InstantiatingHttpJsonChannelProvider instantiatingHttpJsonChannelProvider = InstantiatingHttpJsonChannelProvider.newBuilder() From 2ace4b1c850d3a01125642b39a2246d9734d0beb Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Fri, 2 Jun 2023 16:11:06 -0400 Subject: [PATCH 13/13] chore: Address pr comments --- .../api/gax/httpjson/ManagedHttpJsonChannel.java | 12 ++++++++---- .../api/gax/core/InstantiatingExecutorProvider.java | 6 +++--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java index abc8c8036b..22e333752a 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java @@ -98,7 +98,7 @@ public synchronized void shutdown() { try { // Only shutdown the executor if it was created by Gax. External executors // should be managed by the user. - if (usingDefaultExecutor && executor instanceof ExecutorService) { + if (shouldManageExecutor()) { ((ExecutorService) executor).shutdown(); } deadlineScheduledExecutorService.shutdown(); @@ -115,7 +115,7 @@ public boolean isShutdown() { // TODO(lawrenceqiu): Expose an isShutdown() method for HttpTransport boolean isShutdown = isTransportShutdown && deadlineScheduledExecutorService.isShutdown(); // Check that the Gax's ExecutorService is shutdown as well - if (usingDefaultExecutor && executor instanceof ExecutorService) { + if (shouldManageExecutor()) { isShutdown = isShutdown && ((ExecutorService) executor).isShutdown(); } return isShutdown; @@ -125,7 +125,7 @@ public boolean isShutdown() { public boolean isTerminated() { boolean isTerminated = deadlineScheduledExecutorService.isTerminated(); // Check that the Gax's ExecutorService is terminated as well - if (usingDefaultExecutor && executor instanceof ExecutorService) { + if (shouldManageExecutor()) { isTerminated = isTerminated && ((ExecutorService) executor).isTerminated(); } return isTerminated; @@ -140,7 +140,7 @@ public void shutdownNow() { try { // Only shutdown the executor if it was created by Gax. External executors // should be managed by the user. - if (usingDefaultExecutor && executor instanceof ExecutorService) { + if (shouldManageExecutor()) { ((ExecutorService) executor).shutdownNow(); } deadlineScheduledExecutorService.shutdownNow(); @@ -172,6 +172,10 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted return deadlineScheduledExecutorService.awaitTermination(awaitTimeNanos, unit); } + private boolean shouldManageExecutor() { + return usingDefaultExecutor && executor instanceof ExecutorService; + } + @Override public void close() { shutdown(); diff --git a/gax-java/gax/src/main/java/com/google/api/gax/core/InstantiatingExecutorProvider.java b/gax-java/gax/src/main/java/com/google/api/gax/core/InstantiatingExecutorProvider.java index e1670df879..1faee91a4a 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/core/InstantiatingExecutorProvider.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/core/InstantiatingExecutorProvider.java @@ -54,7 +54,7 @@ public Thread newThread(Runnable runnable) { return thread; } }; - private static final int MIN_CPU_AMOUNT = 4; + private static final int MIN_THREAD_AMOUNT = 4; // Attempt to choose a reasonable default core pool multiplier for IO Bound operations private static final int IO_THREAD_MULTIPLIER = 50; @@ -83,7 +83,7 @@ public boolean shouldAutoClose() { // Thread count minimum is at least `MIN_CPU_AMOUNT` public static Builder newBuilder() { int numCpus = Runtime.getRuntime().availableProcessors(); - int numThreads = Math.max(MIN_CPU_AMOUNT, numCpus); + int numThreads = Math.max(MIN_THREAD_AMOUNT, numCpus); return new AutoValue_InstantiatingExecutorProvider.Builder() .setExecutorThreadCount(numThreads) @@ -94,7 +94,7 @@ public static Builder newBuilder() { // Thread count minimum is at least `MIN_CPU_AMOUNT` * `IO_THREAD_MULTIPLIER` public static Builder newIOBuilder() { int numCpus = Runtime.getRuntime().availableProcessors(); - int numThreads = IO_THREAD_MULTIPLIER * Math.max(MIN_CPU_AMOUNT, numCpus); + int numThreads = IO_THREAD_MULTIPLIER * Math.max(MIN_THREAD_AMOUNT, numCpus); return new AutoValue_InstantiatingExecutorProvider.Builder() .setExecutorThreadCount(numThreads)