Skip to content

Commit

Permalink
fix: Change all DirectExecutors to use a SystemExecutor and fix servi…
Browse files Browse the repository at this point in the history
…ce shutdowns (#744)

Also deflake the world so all tests run 100 times in a row successfully even with the new executors, and centralize alarm creation to fix polling test bugs due to scheduleAtFixedRate and test flakiness.
  • Loading branch information
dpcollins-google committed Jul 9, 2021
1 parent 245004e commit c8541aa
Show file tree
Hide file tree
Showing 38 changed files with 425 additions and 316 deletions.
Expand Up @@ -18,8 +18,8 @@

import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkState;
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;

import com.google.api.core.ApiService;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
Expand All @@ -28,8 +28,9 @@
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.Assigner;
import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.flogger.GoogleLogger;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -38,6 +39,7 @@
import java.util.Set;

public class AssigningSubscriber extends ProxyService implements Subscriber {
private static final GoogleLogger LOG = GoogleLogger.forEnclosingClass();
private final PartitionSubscriberFactory subscriberFactory;

private final CloseableMonitor monitor = new CloseableMonitor();
Expand Down Expand Up @@ -66,10 +68,9 @@ protected void start() {}
protected void stop() {
try (CloseableMonitor.Hold h = monitor.enter()) {
shutdown = true;
liveSubscriberMap.values().forEach(ApiService::stopAsync);
liveSubscriberMap.values().forEach(ApiService::awaitTerminated);
blockingShutdown(liveSubscriberMap.values());
liveSubscriberMap.clear();
stoppingSubscribers.forEach(Subscriber::awaitTerminated);
blockingShutdown(stoppingSubscribers);
}
}

Expand Down Expand Up @@ -115,7 +116,7 @@ public void terminated(State from) {
}
}
},
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
liveSubscriberMap.put(partition, subscriber);
subscriber.startAsync();
}
Expand Down
Expand Up @@ -30,9 +30,9 @@
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import java.util.List;

Expand Down Expand Up @@ -117,7 +117,7 @@ public void onSuccess(Void result) {
ack();
}
},
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
}
};
receiver.receiveMessage(userMessage, clientConsumer);
Expand Down
Expand Up @@ -27,7 +27,7 @@
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.pubsub.v1.PubsubMessage;

// A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant
Expand Down Expand Up @@ -59,6 +59,6 @@ public ApiFuture<String> publish(PubsubMessage message) {
return ApiFutures.transform(
wirePublisher.publish(wireMessage),
MessageMetadata::encode,
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
}
}
Expand Up @@ -27,6 +27,7 @@
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.cloud.pubsublite.proto.CreateReservationRequest;
import com.google.cloud.pubsublite.proto.CreateSubscriptionRequest;
import com.google.cloud.pubsublite.proto.CreateTopicRequest;
Expand Down Expand Up @@ -58,7 +59,6 @@
import com.google.cloud.pubsublite.proto.UpdateTopicRequest;
import com.google.cloud.pubsublite.v1.AdminServiceClient;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.FieldMask;
import java.util.List;

Expand Down Expand Up @@ -104,7 +104,7 @@ public ApiFuture<Long> getTopicPartitionCount(TopicPath path) {
.getTopicPartitionsCallable()
.futureCall(GetTopicPartitionsRequest.newBuilder().setName(path.toString()).build()),
TopicPartitions::getPartitionCount,
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
}

@Override
Expand All @@ -114,7 +114,7 @@ public ApiFuture<List<Topic>> listTopics(LocationPath path) {
.listTopicsCallable()
.futureCall(ListTopicsRequest.newBuilder().setParent(path.toString()).build()),
ListTopicsResponse::getTopicsList,
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
}

@Override
Expand All @@ -131,7 +131,7 @@ public ApiFuture<Void> deleteTopic(TopicPath path) {
.deleteTopicCallable()
.futureCall(DeleteTopicRequest.newBuilder().setName(path.toString()).build()),
x -> null,
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
}

@Override
Expand All @@ -148,7 +148,7 @@ public ApiFuture<List<SubscriptionPath>> listTopicSubscriptions(TopicPath path)
}
return builder.build();
},
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
}

@Override
Expand Down Expand Up @@ -180,7 +180,7 @@ public ApiFuture<List<Subscription>> listSubscriptions(LocationPath path) {
.listSubscriptionsCallable()
.futureCall(ListSubscriptionsRequest.newBuilder().setParent(path.toString()).build()),
ListSubscriptionsResponse::getSubscriptionsList,
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
}

@Override
Expand Down Expand Up @@ -227,7 +227,7 @@ public ApiFuture<Void> deleteSubscription(SubscriptionPath path) {
.deleteSubscriptionCallable()
.futureCall(DeleteSubscriptionRequest.newBuilder().setName(path.toString()).build()),
x -> null,
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
}

@Override
Expand Down Expand Up @@ -257,7 +257,7 @@ public ApiFuture<List<Reservation>> listReservations(LocationPath path) {
.listReservationsCallable()
.futureCall(ListReservationsRequest.newBuilder().setParent(path.toString()).build()),
ListReservationsResponse::getReservationsList,
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
}

@Override
Expand All @@ -278,7 +278,7 @@ public ApiFuture<Void> deleteReservation(ReservationPath path) {
.deleteReservationCallable()
.futureCall(DeleteReservationRequest.newBuilder().setName(path.toString()).build()),
x -> null,
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
}

@Override
Expand All @@ -294,6 +294,6 @@ public ApiFuture<List<TopicPath>> listReservationTopics(ReservationPath path) {
}
return builder.build();
},
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
}
}
@@ -0,0 +1,50 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.internal;

import static java.util.concurrent.TimeUnit.NANOSECONDS;

import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.common.flogger.GoogleLogger;
import java.time.Duration;
import java.util.concurrent.Future;

// An alarm factory comes with a builtin delay and constructs a future which runs that delay after
// it finishes.
public interface AlarmFactory {
Future<?> newAlarm(Runnable runnable);

// Get around lack of interface support for private static members in java 8
final class Internal {
private static final GoogleLogger LOGGER = GoogleLogger.forEnclosingClass();
};

static AlarmFactory create(Duration duration) {
return runnable ->
SystemExecutors.getAlarmExecutor()
.scheduleWithFixedDelay(
() -> {
try {
runnable.run();
} catch (Throwable t) {
Internal.LOGGER.atSevere().withCause(t).log("Alarm leaked exception.");
}
},
0,
duration.toNanos(),
NANOSECONDS);
}
}
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.pubsublite.internal;

import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService.Listener;
Expand All @@ -26,8 +28,8 @@
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Collection;
Expand Down Expand Up @@ -57,7 +59,7 @@ public void failed(State state, Throwable throwable) {
fail(ExtractStatus.toCanonical(throwable));
}
},
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
underlying.startAsync().awaitRunning();
underlying.allowFlow(
FlowControlRequest.newBuilder()
Expand Down Expand Up @@ -123,6 +125,6 @@ public void close() {
"Subscriber client shut down", StatusCode.Code.UNAVAILABLE));
}
}
underlying.stopAsync().awaitTerminated();
blockingShutdown(underlying);
}
}
Expand Up @@ -16,17 +16,19 @@

package com.google.cloud.pubsublite.internal;

import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;

import com.google.api.core.ApiService.Listener;
import com.google.api.core.ApiService.State;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Collection;
Expand Down Expand Up @@ -56,7 +58,7 @@ public void failed(State state, Throwable throwable) {
fail(ExtractStatus.toCanonical(throwable));
}
},
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
underlying.startAsync().awaitRunning();
underlying.allowFlow(
FlowControlRequest.newBuilder()
Expand Down Expand Up @@ -100,6 +102,6 @@ public synchronized Optional<Offset> nextOffset() {

@Override
public void close() {
underlying.stopAsync().awaitTerminated();
blockingShutdown(underlying);
}
}
Expand Up @@ -21,13 +21,13 @@
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.cloud.pubsublite.proto.CommitCursorRequest;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.ListPartitionCursorsRequest;
import com.google.cloud.pubsublite.proto.PartitionCursor;
import com.google.cloud.pubsublite.v1.CursorServiceClient;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Map;

public class CursorClientImpl extends ApiResourceAggregation implements CursorClient {
Expand Down Expand Up @@ -61,7 +61,7 @@ public ApiFuture<Map<Partition, Offset>> listPartitionCursors(SubscriptionPath p
}
return resultBuilder.build();
},
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
}

@Override
Expand All @@ -76,6 +76,6 @@ public ApiFuture<Void> commitCursor(SubscriptionPath path, Partition partition,
.setCursor(Cursor.newBuilder().setOffset(offset.value()))
.build()),
x -> null,
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
}
}
Expand Up @@ -20,7 +20,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -53,7 +53,7 @@ public static <T> ApiFuture<T> toClientFuture(ApiFuture<T> source) {
source,
Throwable.class,
t -> ApiFutures.immediateFailedFuture(toCanonical(t).underlying),
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
}

public static void addFailureHandler(
Expand All @@ -66,7 +66,7 @@ public static void addFailureHandler(
consumer.accept(toCanonical(e));
}
},
MoreExecutors.directExecutor());
SystemExecutors.getFuturesExecutor());
}

public interface StatusFunction<I, O> {
Expand Down

0 comments on commit c8541aa

Please sign in to comment.