Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement awaitTermination() for MangedHttpJsonChannel #1677

Merged
merged 14 commits into from Jun 2, 2023
Merged
Expand Up @@ -186,6 +186,8 @@ private HttpJsonTransportChannel createChannel() throws IOException, GeneralSecu
httpTransportToUse = createHttpTransport();
}

// Pass the executor to the ManagedChannel. If no executor was provided (or null),
// the channel will use a default executor for the calls.
ManagedHttpJsonChannel channel =
ManagedHttpJsonChannel.newBuilder()
.setEndpoint(endpoint)
Expand Down
Expand Up @@ -34,6 +34,7 @@
import com.google.api.core.BetaApi;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.concurrent.Executor;
Expand All @@ -47,23 +48,24 @@
@BetaApi
public class ManagedHttpJsonChannel implements HttpJsonChannel, BackgroundResource {

private static final ExecutorService DEFAULT_EXECUTOR =
InstantiatingExecutorProvider.newBuilder().build().getExecutor();

lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
private final Executor executor;
private final boolean usingDefaultExecutor;
private final String endpoint;
private final HttpTransport httpTransport;
private final ScheduledExecutorService deadlineScheduledExecutorService;

private boolean isTransportShutdown;

protected ManagedHttpJsonChannel() {
this(null, null, null);
this(null, true, null, null);
}

private ManagedHttpJsonChannel(
Executor executor, String endpoint, @Nullable HttpTransport httpTransport) {
Executor executor,
boolean usingDefaultExecutor,
String endpoint,
@Nullable HttpTransport httpTransport) {
this.executor = executor;
this.usingDefaultExecutor = usingDefaultExecutor;
this.endpoint = endpoint;
this.httpTransport = httpTransport == null ? new NetHttpTransport() : httpTransport;
this.deadlineScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
Expand All @@ -82,58 +84,116 @@ public <RequestT, ResponseT> HttpJsonClientCall<RequestT, ResponseT> newCall(
deadlineScheduledExecutorService);
}

@VisibleForTesting
Executor getExecutor() {
return executor;
}

@Override
public synchronized void shutdown() {
// Calling shutdown/ shutdownNow() twice should no-op
if (isTransportShutdown) {
return;
}
try {
// Only shutdown the executor if it was created by Gax. External executors
// should be managed by the user.
if (usingDefaultExecutor && executor instanceof ExecutorService) {
((ExecutorService) executor).shutdown();
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
}
deadlineScheduledExecutorService.shutdown();
httpTransport.shutdown();
isTransportShutdown = true;
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
} catch (IOException e) {
e.printStackTrace();
// TODO: Log this scenario once we implemented the Cloud SDK logging.
// Swallow error if httpTransport shutdown fails
}
}

@Override
public boolean isShutdown() {
return isTransportShutdown;
// TODO(lawrenceqiu): Expose an isShutdown() method for HttpTransport
boolean isShutdown = isTransportShutdown && deadlineScheduledExecutorService.isShutdown();
// Check that the Gax's ExecutorService is shutdown as well
if (usingDefaultExecutor && executor instanceof ExecutorService) {
isShutdown = isShutdown && ((ExecutorService) executor).isShutdown();
}
return isShutdown;
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public boolean isTerminated() {
return isTransportShutdown;
boolean isTerminated = deadlineScheduledExecutorService.isTerminated();
// Check that the Gax's ExecutorService is terminated as well
if (usingDefaultExecutor && executor instanceof ExecutorService) {
isTerminated = isTerminated && ((ExecutorService) executor).isTerminated();
}
return isTerminated;
}

@Override
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
public void shutdownNow() {
shutdown();
// Calling shutdown/ shutdownNow() twice should no-op
if (isTransportShutdown) {
return;
}
try {
// Only shutdown the executor if it was created by Gax. External executors
// should be managed by the user.
if (usingDefaultExecutor && executor instanceof ExecutorService) {
((ExecutorService) executor).shutdownNow();
}
deadlineScheduledExecutorService.shutdownNow();
httpTransport.shutdown();
isTransportShutdown = true;
} catch (IOException e) {
// TODO: Log this scenario once we implemented the Cloud SDK logging.
// Swallow error if httpTransport shutdown fails
}
}

@Override
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
// TODO
return false;
long endTimeNanos = System.nanoTime() + unit.toNanos(duration);
long awaitTimeNanos = endTimeNanos - System.nanoTime();
if (awaitTimeNanos <= 0) {
return false;
}
// Only awaitTermination for the executor if it was created by Gax. External executors
// should be managed by the user.
if (usingDefaultExecutor && executor instanceof ExecutorService) {
blakeli0 marked this conversation as resolved.
Show resolved Hide resolved
boolean terminated = ((ExecutorService) executor).awaitTermination(awaitTimeNanos, unit);
// Termination duration has elapsed
if (!terminated) {
return false;
}
}
awaitTimeNanos = endTimeNanos - System.nanoTime();
return deadlineScheduledExecutorService.awaitTermination(awaitTimeNanos, unit);
}

@Override
public void close() {}
public void close() {
shutdown();
}

public static Builder newBuilder() {
return new Builder().setExecutor(DEFAULT_EXECUTOR);
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
return new Builder();
}

public static class Builder {

private Executor executor;
private String endpoint;
private HttpTransport httpTransport;
private boolean usingDefaultExecutor;

private Builder() {}
private Builder() {
this.usingDefaultExecutor = false;
}

public Builder setExecutor(Executor executor) {
this.executor = executor == null ? DEFAULT_EXECUTOR : executor;
this.executor = executor;
return this;
}

Expand All @@ -150,8 +210,20 @@ public Builder setHttpTransport(HttpTransport httpTransport) {
public ManagedHttpJsonChannel build() {
Preconditions.checkNotNull(endpoint);

// If the executor provided for this channel is null, gax will provide a
// default executor to used for the calls. Only the default executor's
// lifecycle will be managed by the channel. Any external executor needs to
// managed by the user.
if (executor == null) {
executor = InstantiatingExecutorProvider.newIOBuilder().build().getExecutor();
usingDefaultExecutor = true;
}

return new ManagedHttpJsonChannel(
executor, endpoint, httpTransport == null ? new NetHttpTransport() : httpTransport);
executor,
usingDefaultExecutor,
endpoint,
httpTransport == null ? new NetHttpTransport() : httpTransport);
}
}
}
Expand Up @@ -30,6 +30,7 @@
package com.google.api.gax.httpjson;

import com.google.api.core.BetaApi;
import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.TimeUnit;

@BetaApi
Expand All @@ -45,6 +46,11 @@ class ManagedHttpJsonInterceptorChannel extends ManagedHttpJsonChannel {
this.interceptor = interceptor;
}

@VisibleForTesting
ManagedHttpJsonChannel getChannel() {
return channel;
}

@Override
public <RequestT, ResponseT> HttpJsonClientCall<RequestT, ResponseT> newCall(
ApiMethodDescriptor<RequestT, ResponseT> methodDescriptor, HttpJsonCallOptions callOptions) {
Expand Down
Expand Up @@ -39,6 +39,7 @@
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand All @@ -50,20 +51,22 @@
@RunWith(JUnit4.class)
public class InstantiatingHttpJsonChannelProviderTest extends AbstractMtlsTransportChannelTest {

private static final String DEFAULT_ENDPOINT = "localhost:8080";
private static final Map<String, String> DEFAULT_HEADER_MAP = Collections.emptyMap();

@Test
public void basicTest() throws IOException {
String endpoint = "localhost:8080";
ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
executor.shutdown();

TransportChannelProvider provider = InstantiatingHttpJsonChannelProvider.newBuilder().build();

assertThat(provider.needsEndpoint()).isTrue();
provider = provider.withEndpoint(endpoint);
provider = provider.withEndpoint(DEFAULT_ENDPOINT);
assertThat(provider.needsEndpoint()).isFalse();

assertThat(provider.needsHeaders()).isTrue();
provider = provider.withHeaders(Collections.<String, String>emptyMap());
provider = provider.withHeaders(DEFAULT_HEADER_MAP);
assertThat(provider.needsHeaders()).isFalse();

// Make sure getTransportChannel works without setting executor
Expand Down Expand Up @@ -103,6 +106,57 @@ public void basicTest() throws IOException {
provider.getTransportChannel().shutdownNow();
}

// Ensure that a default executor is created by the ManagedHttpJsonChannel even
// if not provided by the TransportChannelProvider
@Test
public void managedChannelUsesDefaultChannelExecutor() throws IOException {
InstantiatingHttpJsonChannelProvider instantiatingHttpJsonChannelProvider =
InstantiatingHttpJsonChannelProvider.newBuilder().setEndpoint(DEFAULT_ENDPOINT).build();
instantiatingHttpJsonChannelProvider =
(InstantiatingHttpJsonChannelProvider)
instantiatingHttpJsonChannelProvider.withHeaders(DEFAULT_HEADER_MAP);
HttpJsonTransportChannel httpJsonTransportChannel =
instantiatingHttpJsonChannelProvider.getTransportChannel();

// By default, the channel will be wrapped with ManagedHttpJsonInterceptorChannel
ManagedHttpJsonInterceptorChannel interceptorChannel =
(ManagedHttpJsonInterceptorChannel) httpJsonTransportChannel.getManagedChannel();
ManagedHttpJsonChannel managedHttpJsonChannel = interceptorChannel.getChannel();
assertThat(managedHttpJsonChannel.getExecutor()).isNotNull();

// Clean up the resources (executor, deadlineScheduler, httpTransport)
instantiatingHttpJsonChannelProvider.getTransportChannel().shutdownNow();
}

// Ensure that the user's executor is used by the ManagedHttpJsonChannel
@Test
public void managedChannelUsesCustomExecutor() throws IOException {
// Custom executor to use -- Lifecycle must be managed by this test
ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
executor.shutdown();

InstantiatingHttpJsonChannelProvider instantiatingHttpJsonChannelProvider =
InstantiatingHttpJsonChannelProvider.newBuilder()
.setEndpoint(DEFAULT_ENDPOINT)
.setExecutor(executor)
.build();
instantiatingHttpJsonChannelProvider =
(InstantiatingHttpJsonChannelProvider)
instantiatingHttpJsonChannelProvider.withHeaders(DEFAULT_HEADER_MAP);
HttpJsonTransportChannel httpJsonTransportChannel =
instantiatingHttpJsonChannelProvider.getTransportChannel();

// By default, the channel will be wrapped with ManagedHttpJsonInterceptorChannel
ManagedHttpJsonInterceptorChannel interceptorChannel =
(ManagedHttpJsonInterceptorChannel) httpJsonTransportChannel.getManagedChannel();
ManagedHttpJsonChannel managedHttpJsonChannel = interceptorChannel.getChannel();
assertThat(managedHttpJsonChannel.getExecutor()).isNotNull();
assertThat(managedHttpJsonChannel.getExecutor()).isEqualTo(executor);

// Clean up the resources (executor, deadlineScheduler, httpTransport)
instantiatingHttpJsonChannelProvider.getTransportChannel().shutdownNow();
}

@Override
protected Object getMtlsObjectFromTransportChannel(MtlsProvider provider)
throws IOException, GeneralSecurityException {
Expand Down
Expand Up @@ -54,6 +54,9 @@ public Thread newThread(Runnable runnable) {
return thread;
}
};
private static final int MIN_CPU_AMOUNT = 4;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be MIN_THREAD_AMOUNT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Updating.

// Attempt to choose a reasonable default core pool multiplier for IO Bound operations
private static final int IO_THREAD_MULTIPLIER = 50;

// Package-private constructor prevents others from subclassing.
InstantiatingExecutorProvider() {}
Expand All @@ -76,9 +79,22 @@ public boolean shouldAutoClose() {

public abstract Builder toBuilder();

// Used for CPU Bound tasks as the thread count is at max the number of processors
// Thread count minimum is at least `MIN_CPU_AMOUNT`
public static Builder newBuilder() {
int numCpus = Runtime.getRuntime().availableProcessors();
int numThreads = Math.max(4, numCpus);
int numThreads = Math.max(MIN_CPU_AMOUNT, numCpus);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was expecting this change only in ManagedHttpJsonChannel, changing it here should be fine as well, but be aware that changing this in InstantiatingExecutorProvider would affect the current background tasks used for grpc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.I created a new builder: newIOBuilder that should only be used for HttpJson's async calls and wouldn't change the background executor.


return new AutoValue_InstantiatingExecutorProvider.Builder()
.setExecutorThreadCount(numThreads)
.setThreadFactory(DEFAULT_THREAD_FACTORY);
}

// Used for IO Bound tasks as the thread count scales with the number of processors
// Thread count minimum is at least `MIN_CPU_AMOUNT` * `IO_THREAD_MULTIPLIER`
public static Builder newIOBuilder() {
int numCpus = Runtime.getRuntime().availableProcessors();
int numThreads = IO_THREAD_MULTIPLIER * Math.max(MIN_CPU_AMOUNT, numCpus);

return new AutoValue_InstantiatingExecutorProvider.Builder()
.setExecutorThreadCount(numThreads)
Expand Down