Skip to content

Commit

Permalink
fix: Use an unbounded pool for the GRPC executor (#997)
Browse files Browse the repository at this point in the history
This prevents lockups when user code blocks the subscriber client threads.
  • Loading branch information
dpcollins-google committed Dec 16, 2021
1 parent 2cd8b85 commit 6f3946d
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 15 deletions.
231 changes: 231 additions & 0 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
@@ -0,0 +1,231 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- Added method to AdminClient interface (Always okay) -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/pubsublite/AdminClient</className>
<method>*</method>
</difference>
<!-- Added abstract method to AutoValue.Builder class (Always okay) -->
<difference>
<differenceType>7013</differenceType>
<className>**/*$Builder</className>
<method>*</method>
</difference>
<!-- Blanket ignored files -->
<difference>
<differenceType>4001</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<to>**</to>
</difference>
<difference>
<differenceType>5001</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<to>**</to>
</difference>
<difference>
<differenceType>6000</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>6001</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<field>*</field>
<method>*</method>
</difference>
<difference>
<differenceType>7001</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
<to>*</to>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
<to>*</to>
</difference>
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7011</differenceType>
<className>com/google/cloud/pubsublite/internal/*</className>
<method>*</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>8000</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>8000</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
</difference>
<difference>
<differenceType>4001</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<to>**</to>
</difference>
<difference>
<differenceType>5001</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<to>**</to>
</difference>
<difference>
<differenceType>6000</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>6001</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<field>*</field>
<method>*</method>
</difference>
<difference>
<differenceType>7001</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<method>*</method>
<to>*</to>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<method>*</method>
<to>*</to>
</difference>
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7011</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>8000</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
</difference>
<difference>
<differenceType>6000</differenceType>
<className>com/google/cloud/pubsublite/v1/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>6001</differenceType>
<className>com/google/cloud/pubsublite/v1/**</className>
<field>*</field>
<method>*</method>
</difference>
<difference>
<differenceType>7001</differenceType>
<className>com/google/cloud/pubsublite/v1/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/pubsublite/v1/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7011</differenceType>
<className>com/google/cloud/pubsublite/v1/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/pubsublite/v1/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/v1/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>8000</differenceType>
<className>com/google/cloud/pubsublite/v1/**</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/v1/**</className>
</difference>
</differences>
Expand Up @@ -198,7 +198,7 @@ public final void onError(Throwable t) {
"Stream disconnected attempting retry, after %s milliseconds for %s",
backoffTime, streamDescription());
ScheduledFuture<?> retry =
SystemExecutors.getFuturesExecutor()
SystemExecutors.getAlarmExecutor()
.schedule(
() -> {
try {
Expand Down
Expand Up @@ -18,40 +18,34 @@

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;

import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.Endpoints;
import com.google.cloud.pubsublite.internal.Lazy;
import com.google.common.collect.ImmutableMap;
import org.threeten.bp.Duration;

public final class ServiceClients {
private ServiceClients() {}

private static final Lazy<ExecutorProvider> PROVIDER =
new Lazy<>(
() ->
FixedExecutorProvider.create(
SystemExecutors.newDaemonExecutor("pubsub-lite-service-clients")));

public static <
Settings extends ClientSettings<Settings>,
Builder extends ClientSettings.Builder<Settings, Builder>>
Settings addDefaultSettings(CloudRegion target, Builder builder) throws ApiException {
try {
return builder
.setEndpoint(Endpoints.regionalEndpoint(target))
.setExecutorProvider(PROVIDER.get())
.setBackgroundExecutorProvider(
FixedExecutorProvider.create(SystemExecutors.getAlarmExecutor()))
.setTransportChannelProvider(
InstantiatingGrpcChannelProvider.newBuilder()
.setMaxInboundMessageSize(Integer.MAX_VALUE)
.setKeepAliveTime(Duration.ofMinutes(1))
.setKeepAliveWithoutCalls(true)
.setKeepAliveTimeout(Duration.ofMinutes(1))
.setExecutor(SystemExecutors.getFuturesExecutor())
.build())
.build();
} catch (Throwable t) {
Expand Down
Expand Up @@ -18,16 +18,21 @@

import com.google.cloud.pubsublite.internal.Lazy;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

public class SystemExecutors {
private SystemExecutors() {}

private static ThreadFactory newDaemonThreadFactory(String prefix) {
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build();
}

public static ScheduledExecutorService newDaemonExecutor(String prefix) {
return Executors.newScheduledThreadPool(
Math.max(4, Runtime.getRuntime().availableProcessors()),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build());
Math.max(4, Runtime.getRuntime().availableProcessors()), newDaemonThreadFactory(prefix));
}

private static final Lazy<ScheduledExecutorService> ALARM_EXECUTOR =
Expand All @@ -37,10 +42,14 @@ public static ScheduledExecutorService getAlarmExecutor() {
return ALARM_EXECUTOR.get();
}

private static final Lazy<ScheduledExecutorService> FUTURES_EXECUTOR =
new Lazy<>(() -> newDaemonExecutor("pubsub-lite-futures"));
private static Executor newDaemonThreadPool(String prefix) {
return Executors.newCachedThreadPool(newDaemonThreadFactory(prefix));
}

private static final Lazy<Executor> FUTURES_EXECUTOR =
new Lazy<>(() -> newDaemonThreadPool("pubsub-lite-futures"));
// An executor for future handling.
public static ScheduledExecutorService getFuturesExecutor() {
public static Executor getFuturesExecutor() {
return FUTURES_EXECUTOR.get();
}
}

0 comments on commit 6f3946d

Please sign in to comment.