Skip to content

Commit

Permalink
fix: Deadlock in RetryingConnectionImpl when tearing down a connection (
Browse files Browse the repository at this point in the history
#406)

* fix: Deadlock in RetryingConnectionImpl when tearing down a connection

Also use a shared executor for all GRPC clients to reduce total threadcount and fix errors when tearing down executors used by GRPC connections.

Also add logging to RetryingConnectionImpl

Also ensure all calls out from RetryingConnectionImpl cannot leak exceptions.

* fix: More fixes
  • Loading branch information
dpcollins-google committed Dec 8, 2020
1 parent 509dad6 commit af77486
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 9 deletions.
@@ -0,0 +1,38 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal;

import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/** A thread-safe, lazily constructed instance of an object. */
public class Lazy<T> {
@GuardedBy("this")
private @Nullable T instance = null;

private final Supplier<T> supplier;

public Lazy(Supplier<T> supplier) {
this.supplier = supplier;
}

public synchronized T get() {
if (instance == null) instance = supplier.get();
return instance;
}
}
Expand Up @@ -18,20 +18,44 @@

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;

import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.Endpoints;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.threeten.bp.Duration;

public final class ServiceClients {
private ServiceClients() {}

private static final Lazy<ExecutorProvider> PROVIDER =
new Lazy<>(
() ->
FixedExecutorProvider.create(
MoreExecutors.getExitingScheduledExecutorService(
new ScheduledThreadPoolExecutor(
Math.max(4, Runtime.getRuntime().availableProcessors())))));

public static <
Settings extends ClientSettings<Settings>,
Builder extends ClientSettings.Builder<Settings, Builder>>
Settings addDefaultSettings(CloudRegion target, Builder builder) throws ApiException {
try {
return builder.setEndpoint(Endpoints.regionalEndpoint(target)).build();
return builder
.setEndpoint(Endpoints.regionalEndpoint(target))
.setExecutorProvider(PROVIDER.get())
.setTransportChannelProvider(
InstantiatingGrpcChannelProvider.newBuilder()
.setMaxInboundMessageSize(Integer.MAX_VALUE)
.setKeepAliveTime(Duration.ofMinutes(1))
.setKeepAliveWithoutCalls(true)
.setKeepAliveTimeout(Duration.ofMinutes(1))
.build())
.build();
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
Expand Down
Expand Up @@ -27,12 +27,12 @@
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.common.flogger.GoogleLogger;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.concurrent.GuardedBy;
import org.threeten.bp.Duration;

/**
* A connection which recreates an underlying stream on retryable errors.
Expand Down Expand Up @@ -105,12 +105,20 @@ public void reinitialize() {
@Override
protected void doStop() {
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
if (completed) return;
completed = true;
logger.atFine().log(
String.format("Terminating connection with initial request %s.", initialRequest));
currentConnection.close();
} catch (Exception e) {
notifyFailed(e);
} catch (Throwable t) {
logger.atWarning().withCause(t).log(
String.format(
"Failed while terminating connection with initial request %s.", initialRequest));
notifyFailed(t);
return;
}
logger.atFine().log(
String.format("Terminated connection with initial request %s.", initialRequest));
systemExecutor.shutdownNow();
notifyStopped();
}
Expand Down Expand Up @@ -150,8 +158,8 @@ public final void onResponse(ClientResponseT value) {
}
try {
observer.onClientResponse(value);
} catch (CheckedApiException e) {
setPermanentError(e);
} catch (Throwable t) {
setPermanentError(t);
}
}

Expand All @@ -173,8 +181,8 @@ public final void onError(Throwable t) {
currentConnection.close();
backoffTime = nextRetryBackoffDuration;
nextRetryBackoffDuration = Math.min(backoffTime * 2, MAX_RECONNECT_BACKOFF_TIME.toMillis());
} catch (Exception e) {
throwable = Optional.of(e);
} catch (Throwable t2) {
throwable = Optional.of(t2);
}
if (throwable.isPresent()) {
setPermanentError(
Expand All @@ -187,7 +195,17 @@ public final void onError(Throwable t) {
logger.atFine().withCause(t).log(
"Stream disconnected attempting retry, after %s milliseconds", backoffTime);
ScheduledFuture<?> retry =
systemExecutor.schedule(observer::triggerReinitialize, backoffTime, MILLISECONDS);
systemExecutor.schedule(
() -> {
try {
observer.triggerReinitialize();
} catch (Throwable t2) {
logger.atWarning().withCause(t2).log("Error occurred in triggerReinitialize.");
onError(t2);
}
},
backoffTime,
MILLISECONDS);
}

@Override
Expand Down

0 comments on commit af77486

Please sign in to comment.