Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement awaitTermination() for MangedHttpJsonChannel #1677

Merged
merged 14 commits into from Jun 2, 2023
Expand Up @@ -47,23 +47,24 @@
@BetaApi
public class ManagedHttpJsonChannel implements HttpJsonChannel, BackgroundResource {

private static final ExecutorService DEFAULT_EXECUTOR =
InstantiatingExecutorProvider.newBuilder().build().getExecutor();

lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
private final Executor executor;
private final boolean usingDefaultExecutor;
private final String endpoint;
private final HttpTransport httpTransport;
private final ScheduledExecutorService deadlineScheduledExecutorService;

private boolean isTransportShutdown;

protected ManagedHttpJsonChannel() {
this(null, null, null);
this(null, true, null, null);
}

private ManagedHttpJsonChannel(
Executor executor, String endpoint, @Nullable HttpTransport httpTransport) {
Executor executor,
boolean usingDefaultExecutor,
String endpoint,
@Nullable HttpTransport httpTransport) {
this.executor = executor;
this.usingDefaultExecutor = usingDefaultExecutor;
this.endpoint = endpoint;
this.httpTransport = httpTransport == null ? new NetHttpTransport() : httpTransport;
this.deadlineScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
Expand All @@ -84,10 +85,16 @@ public <RequestT, ResponseT> HttpJsonClientCall<RequestT, ResponseT> newCall(

@Override
public synchronized void shutdown() {
// Calling shutdown() twice should no-op
if (isTransportShutdown) {
return;
}
try {
// Only shutdown the executor if it was created by Gax. External executors
// should be managed by the user.
if (usingDefaultExecutor) {
((ExecutorService) executor).shutdown();
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
}
deadlineScheduledExecutorService.shutdown();
httpTransport.shutdown();
isTransportShutdown = true;
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -98,12 +105,22 @@ public synchronized void shutdown() {

@Override
public boolean isShutdown() {
return isTransportShutdown;
boolean isShutdown = deadlineScheduledExecutorService.isShutdown();
// Check that the Gax's ExecutorService is shutdown as well
if (usingDefaultExecutor) {
isShutdown = isShutdown && ((ExecutorService) executor).isShutdown();
}
return isShutdown;
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public boolean isTerminated() {
return isTransportShutdown;
boolean isTerminated = deadlineScheduledExecutorService.isTerminated();
// Check that the Gax's ExecutorService is terminated as well
if (usingDefaultExecutor) {
isTerminated = isTerminated && ((ExecutorService) executor).isTerminated();
}
return isTerminated;
}

@Override
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -113,15 +130,31 @@ public void shutdownNow() {

@Override
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
// TODO
return false;
long endTimeNanos = System.nanoTime() + unit.toNanos(duration);
long awaitTimeNanos = endTimeNanos - System.nanoTime();
if (awaitTimeNanos <= 0) {
return false;
}
// Only awaitTermination for the executor if it was created by Gax. External executors
// should be managed by the user.
if (usingDefaultExecutor) {
boolean terminated = ((ExecutorService) executor).awaitTermination(awaitTimeNanos, unit);
// Termination duration has elapsed
if (!terminated) {
return false;
}
}
awaitTimeNanos = endTimeNanos - System.nanoTime();
return deadlineScheduledExecutorService.awaitTermination(awaitTimeNanos, unit);
}

@Override
public void close() {}
public void close() {
shutdown();
}

public static Builder newBuilder() {
return new Builder().setExecutor(DEFAULT_EXECUTOR);
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
return new Builder();
}

public static class Builder {
Expand All @@ -133,7 +166,7 @@ public static class Builder {
private Builder() {}

public Builder setExecutor(Executor executor) {
this.executor = executor == null ? DEFAULT_EXECUTOR : executor;
this.executor = executor;
return this;
}

Expand All @@ -150,8 +183,16 @@ public Builder setHttpTransport(HttpTransport httpTransport) {
public ManagedHttpJsonChannel build() {
Preconditions.checkNotNull(endpoint);

boolean usingDefaultExecutor = executor == null;
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
if (usingDefaultExecutor) {
executor = InstantiatingExecutorProvider.newBuilder().build().getExecutor();
}
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved

return new ManagedHttpJsonChannel(
executor, endpoint, httpTransport == null ? new NetHttpTransport() : httpTransport);
executor,
usingDefaultExecutor,
endpoint,
httpTransport == null ? new NetHttpTransport() : httpTransport);
}
}
}