From f441044e0fa88f4b72692f55aa86203b2a438b2e Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Wed, 15 Sep 2021 14:31:42 -0400 Subject: [PATCH 1/2] feat: Batch commit requests also merge TrivialProxyService into ProxyService --- .../internal/MultiPartitionSubscriber.java | 4 +- .../internal/WrappingPublisher.java | 4 +- .../pubsublite/internal/MoreApiFutures.java | 40 ++++++ .../pubsublite/internal/ProxyService.java | 15 ++- .../internal/TrivialProxyService.java | 43 ------- .../internal/wire/ApiExceptionCommitter.java | 4 +- .../internal/wire/ApiExceptionPublisher.java | 4 +- .../internal/wire/ApiServiceUtils.java | 2 +- .../internal/wire/AssignerImpl.java | 8 +- .../internal/wire/BatchingCommitter.java | 71 +++++++++++ .../internal/wire/CommitterImpl.java | 7 +- .../internal/wire/CommitterSettings.java | 7 +- .../wire/PartitionCountWatchingPublisher.java | 3 - .../internal/wire/PublisherImpl.java | 4 +- .../internal/wire/RoutingPublisher.java | 4 +- .../wire/SinglePartitionPublisher.java | 4 +- .../internal/wire/SubscriberImpl.java | 4 +- .../internal/wire/BatchingCommitterTest.java | 116 ++++++++++++++++++ 18 files changed, 266 insertions(+), 78 deletions(-) create mode 100644 google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java delete mode 100644 google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/TrivialProxyService.java create mode 100644 google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitter.java create mode 100644 google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitterTest.java diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/MultiPartitionSubscriber.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/MultiPartitionSubscriber.java index e8dfddb72..f6cb10f98 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/MultiPartitionSubscriber.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/MultiPartitionSubscriber.java @@ -18,12 +18,12 @@ import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.cloudpubsub.Subscriber; -import com.google.cloud.pubsublite.internal.TrivialProxyService; +import com.google.cloud.pubsublite.internal.ProxyService; import java.util.List; // A MultiPartitionSubscriber wraps multiple subscribers into a single ApiService that can be // interacted with. If any single subscriber fails, all others are stopped. -public class MultiPartitionSubscriber extends TrivialProxyService implements Subscriber { +public class MultiPartitionSubscriber extends ProxyService implements Subscriber { public static Subscriber of(List subscribers) throws ApiException { return new MultiPartitionSubscriber(subscribers); } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java index 21fcc933c..8d1c6f67f 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java @@ -26,13 +26,13 @@ import com.google.cloud.pubsublite.MessageTransformer; import com.google.cloud.pubsublite.cloudpubsub.Publisher; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.TrivialProxyService; +import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.internal.wire.SystemExecutors; import com.google.pubsub.v1.PubsubMessage; // A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant // publisher. It encodes a MessageMetadata object in the response string. -public class WrappingPublisher extends TrivialProxyService implements Publisher { +public class WrappingPublisher extends ProxyService implements Publisher { private final com.google.cloud.pubsublite.internal.Publisher wirePublisher; private final MessageTransformer transformer; diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java new file mode 100644 index 000000000..b625151ea --- /dev/null +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java @@ -0,0 +1,40 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.internal; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; +import com.google.cloud.pubsublite.internal.wire.SystemExecutors; + +public final class MoreApiFutures { +private MoreApiFutures() {} + public static void connectFutures(ApiFuture source, SettableApiFuture toConnect) { + ApiFutures.addCallback(source, new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + toConnect.setException(throwable); + } + + @Override + public void onSuccess(T t) { + toConnect.set(t); + } + }, SystemExecutors.getFuturesExecutor()); + } +} diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ProxyService.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ProxyService.java index 7b55658cc..8947e6315 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ProxyService.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ProxyService.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.flogger.GoogleLogger; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -41,7 +42,13 @@ public abstract class ProxyService extends AbstractApiService { private final List services = new ArrayList<>(); private final AtomicBoolean stoppedOrFailed = new AtomicBoolean(false); - protected ProxyService() {} + protected ProxyService(Collection services) { + addServices(services); + } + + protected ProxyService(ApiService... services) throws ApiException { + this(Arrays.asList(services)); + } // Add a new ApiServices to this. Requires that all of them are in state NEW and this is in state // NEW. @@ -59,13 +66,13 @@ protected final void addServices(ApiService... services) throws ApiException { } // Method to be called on service start after dependent services start. - protected abstract void start() throws CheckedApiException; + protected void start() throws CheckedApiException {} // Method to be called on service stop before dependent services stop. - protected abstract void stop() throws CheckedApiException; + protected void stop() throws CheckedApiException {} // Method to be called for class-specific permanent error handling after trying to stop all other // services. May not throw. - protected abstract void handlePermanentError(CheckedApiException error); + protected void handlePermanentError(CheckedApiException error) {} // Tries to stop all dependent services and sets this service into the FAILED state. protected final void onPermanentError(CheckedApiException error) { diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/TrivialProxyService.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/TrivialProxyService.java deleted file mode 100644 index ea8285ac6..000000000 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/TrivialProxyService.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.internal; - -import com.google.api.core.ApiService; -import com.google.api.gax.rpc.ApiException; -import java.util.Arrays; -import java.util.Collection; - -/** A ProxyService that just wraps all ApiService methods. */ -public class TrivialProxyService extends ProxyService { - protected TrivialProxyService(Collection services) throws ApiException { - super(); - addServices(services); - } - - protected TrivialProxyService(ApiService... services) throws ApiException { - this(Arrays.asList(services)); - } - - @Override - protected final void start() {} - - @Override - protected final void stop() {} - - @Override - protected final void handlePermanentError(CheckedApiException error) {} -} diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionCommitter.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionCommitter.java index 30612f0ee..930e9d561 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionCommitter.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionCommitter.java @@ -22,9 +22,9 @@ import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.TrivialProxyService; +import com.google.cloud.pubsublite.internal.ProxyService; -class ApiExceptionCommitter extends TrivialProxyService implements Committer { +class ApiExceptionCommitter extends ProxyService implements Committer { private final Committer committer; ApiExceptionCommitter(Committer committer) throws ApiException { diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionPublisher.java index 30ad6cca4..fe06d066b 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionPublisher.java @@ -21,11 +21,11 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.internal.Publisher; -import com.google.cloud.pubsublite.internal.TrivialProxyService; import java.io.IOException; -public class ApiExceptionPublisher extends TrivialProxyService implements Publisher { +public class ApiExceptionPublisher extends ProxyService implements Publisher { private final Publisher publisher; ApiExceptionPublisher(Publisher publisher) throws ApiException { diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiServiceUtils.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiServiceUtils.java index 40e0bbc1b..e9f54258d 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiServiceUtils.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiServiceUtils.java @@ -31,7 +31,7 @@ public class ApiServiceUtils { private ApiServiceUtils() {} - public static ApiService backgroundResourceAsApiService(BackgroundResource resource) { + public static ApiService autoCloseableAsApiService(AutoCloseable resource) { return new AbstractApiService() { @Override protected void doStart() { diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java index 2606f8fea..ad8161ede 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java @@ -16,13 +16,13 @@ package com.google.cloud.pubsublite.internal.wire; -import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.backgroundResourceAsApiService; +import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.autoCloseableAsApiService; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.CloseableMonitor; -import com.google.cloud.pubsublite.internal.TrivialProxyService; +import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest; import com.google.cloud.pubsublite.proto.PartitionAssignment; import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest; @@ -33,7 +33,7 @@ import java.util.HashSet; import java.util.Set; -public class AssignerImpl extends TrivialProxyService +public class AssignerImpl extends ProxyService implements Assigner, RetryingConnectionObserver { private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); @@ -72,7 +72,7 @@ public AssignerImpl( new ConnectedAssignerImpl.Factory(), initialRequest, receiver); - addServices(backgroundResourceAsApiService(client)); + addServices(autoCloseableAsApiService(client)); } @Override diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitter.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitter.java new file mode 100644 index 000000000..2f7c12bc1 --- /dev/null +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitter.java @@ -0,0 +1,71 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.internal.wire; + +import com.google.api.core.ApiFuture; +import com.google.api.core.SettableApiFuture; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.internal.AlarmFactory; +import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.MoreApiFutures; +import com.google.cloud.pubsublite.internal.ProxyService; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import java.util.Optional; +import java.util.concurrent.Future; + +public class BatchingCommitter extends ProxyService implements Committer { + private final Committer underlying; + + @GuardedBy("this") + private SettableApiFuture currentFuture = SettableApiFuture.create(); + @GuardedBy("this") + private Optional currentOffset = Optional.empty(); + + BatchingCommitter(Committer underlying, AlarmFactory alarmFactory) { + super(underlying); + this.underlying = underlying; + Future alarm = alarmFactory.newAlarm(this::flush); + addServices(ApiServiceUtils.autoCloseableAsApiService(() -> alarm.cancel(false))); + } + + @Override + public synchronized ApiFuture commitOffset(Offset offset) { + currentOffset = Optional.of(offset); + return currentFuture; + } + + @Override + public void waitUntilEmpty() throws CheckedApiException { + flush(); + underlying.waitUntilEmpty(); + } + + @Override + protected void stop() { + flush(); + } + + private synchronized void flush() { + if (!currentOffset.isPresent()) { + return; + } + ApiFuture underlyingFuture = underlying.commitOffset(currentOffset.get()); + MoreApiFutures.connectFutures(underlyingFuture, currentFuture); + currentOffset = Optional.empty(); + currentFuture = SettableApiFuture.create(); + } +} diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterImpl.java index a55bc3114..36920b34e 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterImpl.java @@ -17,7 +17,7 @@ package com.google.cloud.pubsublite.internal.wire; import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkState; -import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.backgroundResourceAsApiService; +import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.autoCloseableAsApiService; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; @@ -81,7 +81,7 @@ public CommitterImpl(CursorServiceClient client, InitialCommitCursorRequest requ stream -> client.streamingCommitCursorCallable().splitCall(stream), new ConnectedCommitterImpl.Factory(), request); - addServices(backgroundResourceAsApiService(client)); + addServices(autoCloseableAsApiService(client)); } // ProxyService implementation. @@ -94,9 +94,6 @@ protected void handlePermanentError(CheckedApiException error) { } } - @Override - protected void start() {} - @Override protected void stop() { try (CloseableMonitor.Hold h = monitor.enter()) { diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterSettings.java index 45c663163..3d39e8cb1 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterSettings.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterSettings.java @@ -19,8 +19,10 @@ import com.google.auto.value.AutoValue; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.internal.AlarmFactory; import com.google.cloud.pubsublite.proto.InitialCommitCursorRequest; import com.google.cloud.pubsublite.v1.CursorServiceClient; +import java.time.Duration; @AutoValue public abstract class CommitterSettings { @@ -53,7 +55,8 @@ public Committer instantiate() { .setSubscription(subscriptionPath().toString()) .setPartition(partition().value()) .build(); - return new ApiExceptionCommitter( - new CommitterImpl(serviceClient(), initialCommitCursorRequest)); + return new ApiExceptionCommitter(new BatchingCommitter( + new CommitterImpl(serviceClient(), initialCommitCursorRequest), + AlarmFactory.create(Duration.ofMillis(50)))); } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java index de394cd16..8481d089a 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java @@ -200,9 +200,6 @@ private void handleConfig(long partitionCount) { } } - @Override - protected void start() {} - @Override protected void stop() { Optional current; diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java index 4e6d373c9..905c11710 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java @@ -17,7 +17,7 @@ package com.google.cloud.pubsublite.internal.wire; import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkState; -import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.backgroundResourceAsApiService; +import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.autoCloseableAsApiService; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; @@ -136,7 +136,7 @@ public PublisherImpl( Objects.requireNonNull(batchingSettings.getDelayThreshold()).toNanos())), initialRequest, batchingSettings); - addServices(backgroundResourceAsApiService(client)); + addServices(autoCloseableAsApiService(client)); } @GuardedBy("monitor.monitor") diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java index ab912202b..966b64b42 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java @@ -26,13 +26,13 @@ import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.internal.RoutingPolicy; -import com.google.cloud.pubsublite.internal.TrivialProxyService; import java.io.IOException; import java.util.Map; -public class RoutingPublisher extends TrivialProxyService implements Publisher { +public class RoutingPublisher extends ProxyService implements Publisher { private final Map> partitionPublishers; private final RoutingPolicy policy; diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java index f30c04b12..b5708d591 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java @@ -23,11 +23,11 @@ import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.internal.Publisher; -import com.google.cloud.pubsublite.internal.TrivialProxyService; import java.io.IOException; -public class SinglePartitionPublisher extends TrivialProxyService +public class SinglePartitionPublisher extends ProxyService implements Publisher { private final Publisher publisher; private final Partition partition; diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java index 73771855a..b3afaabff 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java @@ -17,7 +17,7 @@ package com.google.cloud.pubsublite.internal.wire; import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkArgument; -import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.backgroundResourceAsApiService; +import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.autoCloseableAsApiService; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.SequencedMessage; @@ -107,7 +107,7 @@ public SubscriberImpl( initialLocation, messageConsumer, resetHandler); - addServices(backgroundResourceAsApiService(client)); + addServices(autoCloseableAsApiService(client)); } // ProxyService implementation. diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitterTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitterTest.java new file mode 100644 index 000000000..c1bd12de8 --- /dev/null +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitterTest.java @@ -0,0 +1,116 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.internal.wire; + +import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.checkState; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.internal.AlarmFactory; +import com.google.cloud.pubsublite.internal.testing.FakeApiService; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Spy; + +@RunWith(JUnit4.class) +public class BatchingCommitterTest { + abstract static class FakeCommitter extends FakeApiService implements Committer {} + + @Spy + FakeCommitter underlying; + @Mock + AlarmFactory alarmFactory; + + private BatchingCommitter committer; + + private Runnable flushAlarm; + + @Before + public void setUp() { + initMocks(this); + when(alarmFactory.newAlarm(any())).thenAnswer(args -> { + flushAlarm = args.getArgument(0); + return SettableApiFuture.create(); + }); + committer = new BatchingCommitter(underlying, alarmFactory); + checkState(flushAlarm != null); + committer.startAsync().awaitRunning(); + } + + @Test + public void batchesRequests() throws Exception { + ApiFuture f1 = committer.commitOffset(Offset.of(1)); + ApiFuture f2 = committer.commitOffset(Offset.of(2)); + ApiFuture f3 = committer.commitOffset(Offset.of(3)); + verify(underlying, times(0)).commitOffset(any()); + assertThat(f1.isDone()).isFalse(); + assertThat(f2.isDone()).isFalse(); + assertThat(f3.isDone()).isFalse(); + SettableApiFuture underlyingFuture3 = SettableApiFuture.create(); + when(underlying.commitOffset(Offset.of(3))).thenReturn(underlyingFuture3); + flushAlarm.run(); + verify(underlying, times(1)).commitOffset(Offset.of(3)); + assertThat(f1.isDone()).isFalse(); + assertThat(f2.isDone()).isFalse(); + assertThat(f3.isDone()).isFalse(); + ApiFuture f4 = committer.commitOffset(Offset.of(4)); + SettableApiFuture underlyingFuture4 = SettableApiFuture.create(); + when(underlying.commitOffset(Offset.of(4))).thenReturn(underlyingFuture4); + flushAlarm.run(); + verify(underlying, times(1)).commitOffset(Offset.of(4)); + assertThat(f4.isDone()).isFalse(); + underlyingFuture3.set(null); + f1.get(); + f2.get(); + f3.get(); + assertThat(f4.isDone()).isFalse(); + underlyingFuture4.setException(new Exception("Some error")); + assertThrows(Exception.class, f4::get); + } + + @Test + public void waitUntilEmptyFlushes() throws Exception { + ApiFuture f1 = committer.commitOffset(Offset.of(1)); + verify(underlying, times(0)).commitOffset(any()); + when(underlying.commitOffset(Offset.of(1))).thenReturn(ApiFutures.immediateFuture(null)); + committer.waitUntilEmpty(); + verify(underlying).waitUntilEmpty(); + f1.get(); + } + + @Test + public void shutdownFlushes() throws Exception { + ApiFuture f1 = committer.commitOffset(Offset.of(1)); + verify(underlying, times(0)).commitOffset(any()); + when(underlying.commitOffset(Offset.of(1))).thenReturn(ApiFutures.immediateFuture(null)); + committer.stopAsync().awaitTerminated(); + verify(underlying).commitOffset(Offset.of(1)); + f1.get(); + } +} From 07c38cc6f3b5499d87cd2cfa129ccff608e23ad6 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Wed, 15 Sep 2021 15:48:54 -0400 Subject: [PATCH 2/2] fix: reformat --- .../pubsublite/internal/MoreApiFutures.java | 29 +++++++++++-------- .../internal/wire/ApiServiceUtils.java | 1 - .../internal/wire/BatchingCommitter.java | 1 + .../internal/wire/CommitterSettings.java | 7 +++-- .../wire/SinglePartitionPublisher.java | 3 +- .../internal/wire/BatchingCommitterTest.java | 16 +++++----- 6 files changed, 31 insertions(+), 26 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java index b625151ea..3869c06d2 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java @@ -23,18 +23,23 @@ import com.google.cloud.pubsublite.internal.wire.SystemExecutors; public final class MoreApiFutures { -private MoreApiFutures() {} - public static void connectFutures(ApiFuture source, SettableApiFuture toConnect) { - ApiFutures.addCallback(source, new ApiFutureCallback() { - @Override - public void onFailure(Throwable throwable) { - toConnect.setException(throwable); - } + private MoreApiFutures() {} - @Override - public void onSuccess(T t) { - toConnect.set(t); - } - }, SystemExecutors.getFuturesExecutor()); + public static void connectFutures( + ApiFuture source, SettableApiFuture toConnect) { + ApiFutures.addCallback( + source, + new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + toConnect.setException(throwable); + } + + @Override + public void onSuccess(T t) { + toConnect.set(t); + } + }, + SystemExecutors.getFuturesExecutor()); } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiServiceUtils.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiServiceUtils.java index e9f54258d..c16cef357 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiServiceUtils.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiServiceUtils.java @@ -20,7 +20,6 @@ import com.google.api.core.AbstractApiService; import com.google.api.core.ApiService; -import com.google.api.gax.core.BackgroundResource; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.common.collect.ImmutableList; diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitter.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitter.java index 2f7c12bc1..711a58cbd 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitter.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitter.java @@ -32,6 +32,7 @@ public class BatchingCommitter extends ProxyService implements Committer { @GuardedBy("this") private SettableApiFuture currentFuture = SettableApiFuture.create(); + @GuardedBy("this") private Optional currentOffset = Optional.empty(); diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterSettings.java index 3d39e8cb1..4342837b6 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterSettings.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterSettings.java @@ -55,8 +55,9 @@ public Committer instantiate() { .setSubscription(subscriptionPath().toString()) .setPartition(partition().value()) .build(); - return new ApiExceptionCommitter(new BatchingCommitter( - new CommitterImpl(serviceClient(), initialCommitCursorRequest), - AlarmFactory.create(Duration.ofMillis(50)))); + return new ApiExceptionCommitter( + new BatchingCommitter( + new CommitterImpl(serviceClient(), initialCommitCursorRequest), + AlarmFactory.create(Duration.ofMillis(50)))); } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java index b5708d591..9b9880c57 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java @@ -27,8 +27,7 @@ import com.google.cloud.pubsublite.internal.Publisher; import java.io.IOException; -public class SinglePartitionPublisher extends ProxyService - implements Publisher { +public class SinglePartitionPublisher extends ProxyService implements Publisher { private final Publisher publisher; private final Partition partition; diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitterTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitterTest.java index c1bd12de8..2601fc379 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitterTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitterTest.java @@ -42,10 +42,8 @@ public class BatchingCommitterTest { abstract static class FakeCommitter extends FakeApiService implements Committer {} - @Spy - FakeCommitter underlying; - @Mock - AlarmFactory alarmFactory; + @Spy FakeCommitter underlying; + @Mock AlarmFactory alarmFactory; private BatchingCommitter committer; @@ -54,10 +52,12 @@ abstract static class FakeCommitter extends FakeApiService implements Committer @Before public void setUp() { initMocks(this); - when(alarmFactory.newAlarm(any())).thenAnswer(args -> { - flushAlarm = args.getArgument(0); - return SettableApiFuture.create(); - }); + when(alarmFactory.newAlarm(any())) + .thenAnswer( + args -> { + flushAlarm = args.getArgument(0); + return SettableApiFuture.create(); + }); committer = new BatchingCommitter(underlying, alarmFactory); checkState(flushAlarm != null); committer.startAsync().awaitRunning();