Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

fix: stop overriding the default gRPC executor #869

Closed
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -34,7 +34,6 @@
import com.google.api.core.InternalApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannel;
Expand All @@ -50,6 +49,7 @@
import io.grpc.alts.ComputeEngineChannelBuilder;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
Expand All @@ -76,7 +76,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP
static final int MAX_POOL_SIZE = 1000;

private final int processorCount;
private final ExecutorProvider executorProvider;
@Nullable private final Executor executor;
private final HeaderProvider headerProvider;
private final String endpoint;
private final EnvironmentProvider envProvider;
Expand All @@ -95,7 +95,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP

private InstantiatingGrpcChannelProvider(Builder builder) {
this.processorCount = builder.processorCount;
this.executorProvider = builder.executorProvider;
this.executor = builder.executor;
this.headerProvider = builder.headerProvider;
this.endpoint = builder.endpoint;
this.envProvider = builder.envProvider;
Expand All @@ -113,12 +113,25 @@ private InstantiatingGrpcChannelProvider(Builder builder) {

@Override
public boolean needsExecutor() {
return executorProvider == null;
// Use the default gRPC executor
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks strange. It seems like it can break all existing manual overrides of the executors people may have already introduced. I.e. this method is called in ClientContext.create() method to figure out if a "custom" executor from an executor should be used (passed as executorProvider in stub settings).

I guess in most cases it used to return true, which lead to using the executorProvider from stubSettings, which in its turn (unless user overrides it) leads to using the result of InstantiatingExecutorProvider.newBuilder().build();

I guess if we are talking about default behavior before this change and after this change it is ok (the default behavior will change, but the new default behavior is supposed to be better).

But if somebody really knew what they where doing and decided to override executorProvider in their stubSettings it seems like it will be ignored now, because transportChannelProvider.needsExecutor() in ClientContext.create() will always return false, ignoring the custom executor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the heart of the PR: the gax executor should not be shared with grpc. The PR proposes that channel executor should be configured on the channel providers instead of piping it through.

If you prefer to pipe it through, then the only approach I can think of is:
Add:

StubSettings#setTimerExecutor(ExecutorProvider)
StubSettings#setChannelExecutor(ChannelExecurtorProvider)

Have StubSettings#setExecutorProvider() call both the methods.

Add TransportChannelProvider#acceptsExecutor

Update ClientContext#create to have some logic like:

if (transportChannelProvider.needsExecutor() || (transportChannelProvider.acceptsExecutor() && channelExecutorProvider != null) {
   transportChannelProvider.withExecutor(channelExecutorProvider.get())
}

To me, this introduces a lot of complexity that is very hard to reason about

}

/** @deprecated Please use {@link #withExecutor(Executor) } */
@Deprecated
@Override
public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
return toBuilder().setExecutorProvider(FixedExecutorProvider.create(executor)).build();
return withExecutor((Executor) executor);
}

/**
* Sets the executor to use when constructing a new {@link TransportChannel}..
*
* <p>This can be used to override the default gRPC executor.
*/
@Override
public TransportChannelProvider withExecutor(@Nullable Executor executor) {
return toBuilder().setExecutor(executor).build();
}

@Override
Expand Down Expand Up @@ -223,7 +236,6 @@ private boolean isDirectPathEnabled(String serviceAddress) {
}

private ManagedChannel createSingleChannel() throws IOException {
ScheduledExecutorService executor = executorProvider.getExecutor();
GrpcHeaderInterceptor headerInterceptor =
new GrpcHeaderInterceptor(headerProvider.getHeaders());
GrpcMetadataHandlerInterceptor metadataHandlerInterceptor =
Expand Down Expand Up @@ -348,7 +360,7 @@ public static Builder newBuilder() {

public static final class Builder {
private int processorCount;
private ExecutorProvider executorProvider;
@Nullable private Executor executor;
private HeaderProvider headerProvider;
private String endpoint;
private EnvironmentProvider envProvider;
Expand All @@ -370,7 +382,7 @@ private Builder() {

private Builder(InstantiatingGrpcChannelProvider provider) {
this.processorCount = provider.processorCount;
this.executorProvider = provider.executorProvider;
this.executor = provider.executor;
this.headerProvider = provider.headerProvider;
this.endpoint = provider.endpoint;
this.envProvider = provider.envProvider;
Expand Down Expand Up @@ -398,16 +410,23 @@ Builder setEnvironmentProvider(EnvironmentProvider envProvider) {
return this;
}

/** @deprecated Please use {@link #setExecutor(Executor)} instead. */
@Deprecated
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
Executor executor = null;
if (executorProvider != null) {
executor = executorProvider.getExecutor();
}
return setExecutor(executor);
}

/**
* Sets the ExecutorProvider for this TransportChannelProvider.
* Sets the Executor for this TransportChannelProvider.
*
* <p>This is optional; if it is not provided, needsExecutor() will return true, meaning that an
* Executor must be provided when getChannel is called on the constructed
* TransportChannelProvider instance. Note: GrpcTransportProvider will automatically provide its
* own Executor in this circumstance when it calls getChannel.
* <p>This is optional; if it is not set, the default gRPC executor will be used.
*/
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
public Builder setExecutor(@Nullable Executor executor) {
this.executor = executor;
return this;
}

Expand Down
Expand Up @@ -36,19 +36,25 @@

import com.google.api.core.ApiFunction;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.EnvironmentProvider;
import com.google.api.gax.grpc.testing.FakeServiceGrpc;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.oauth2.CloudShellCredentials;
import com.google.auth.oauth2.ComputeEngineCredentials;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import com.google.common.util.concurrent.SettableFuture;
import com.google.type.Color;
import com.google.type.Money;
import io.grpc.*;
import io.grpc.alts.ComputeEngineChannelBuilder;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -385,4 +391,126 @@ public void testWithPrimeChannel() throws IOException {
.primeChannel(Mockito.any(ManagedChannel.class));
}
}

@Test
public void testExecutorOverride() throws Exception {
final String expectedThreadName = "testExecutorOverrideExecutor";

ThreadPoolExecutor executor =
new ScheduledThreadPoolExecutor(
1,
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, expectedThreadName);
thread.setDaemon(true);
return thread;
}
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
});

try {
InstantiatingGrpcChannelProvider channelProvider =
InstantiatingGrpcChannelProvider.newBuilder()
// Don't actually care to connect to anything
.setEndpoint("localhost:1234")
.setHeaderProvider(FixedHeaderProvider.create())
.setExecutor(executor)
.build();

assertThat(extractExecutorThreadName(channelProvider)).isEqualTo(expectedThreadName);
} finally {
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
}

@Test
@Deprecated
public void testDeprecatedExecutorOverride() throws Exception {
final String expectedThreadName = "testExecutorOverrideExecutor";

ScheduledExecutorService executor =
Executors.newScheduledThreadPool(
1,
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, expectedThreadName);
thread.setDaemon(true);
return thread;
}
});

try {
InstantiatingGrpcChannelProvider channelProvider =
InstantiatingGrpcChannelProvider.newBuilder()
// Don't actually care to connect to anything
.setEndpoint("localhost:1234")
.setHeaderProvider(FixedHeaderProvider.create())
.setExecutorProvider(FixedExecutorProvider.create(executor))
.build();

assertThat(extractExecutorThreadName(channelProvider)).isEqualTo(expectedThreadName);
} finally {
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
}

@Test
public void testExecutorDefault() throws Exception {
InstantiatingGrpcChannelProvider channelProvider =
InstantiatingGrpcChannelProvider.newBuilder()
// Don't actually care if we connect to anything
.setEndpoint("localhost:1234")
.setHeaderProvider(FixedHeaderProvider.create())
.build();

// The default name thread name for grpc threads configured in GrpcUtil
assertThat(extractExecutorThreadName(channelProvider)).contains("grpc-default-executor");
}

/**
* Extract the name of the channel executor thread by instantiating a channel and issuing a fake
* call.
*/
private static String extractExecutorThreadName(InstantiatingGrpcChannelProvider channelProvider)
throws IOException, ExecutionException, InterruptedException {
GrpcTransportChannel transportChannel =
(GrpcTransportChannel) channelProvider.getTransportChannel();
try {
Channel channel = transportChannel.getChannel();

ClientCall<Color, Money> call =
channel.newCall(FakeServiceGrpc.METHOD_RECOGNIZE, CallOptions.DEFAULT);
Color request = Color.getDefaultInstance();

final SettableFuture<String> threadNameFuture = SettableFuture.create();

// Issue a call just to get the thread name of the channel executor
ClientCalls.asyncUnaryCall(
call,
request,
new StreamObserver<Money>() {
@Override
public void onNext(Money ignored) {
threadNameFuture.set(Thread.currentThread().getName());
}

@Override
public void onError(Throwable ignored) {
threadNameFuture.set(Thread.currentThread().getName());
}

@Override
public void onCompleted() {
threadNameFuture.set(Thread.currentThread().getName());
}
});
return threadNameFuture.get();
} finally {
transportChannel.shutdownNow();
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
transportChannel.awaitTermination(10, TimeUnit.SECONDS);
}
}
}
Expand Up @@ -50,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -78,8 +79,15 @@ public boolean needsExecutor() {
return false;
}

/** @deprecated Please use {@link #withExecutor(Executor) } */
@Deprecated
@Override
public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
return withExecutor((Executor) executor);
}

@Override
public TransportChannelProvider withExecutor(Executor executor) {
throw new UnsupportedOperationException("LocalChannelProvider doesn't need an executor");
}

Expand Down