diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/MultiPartitionSubscriber.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/MultiPartitionSubscriber.java index e8dfddb72..f6cb10f98 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/MultiPartitionSubscriber.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/MultiPartitionSubscriber.java @@ -18,12 +18,12 @@ import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.cloudpubsub.Subscriber; -import com.google.cloud.pubsublite.internal.TrivialProxyService; +import com.google.cloud.pubsublite.internal.ProxyService; import java.util.List; // A MultiPartitionSubscriber wraps multiple subscribers into a single ApiService that can be // interacted with. If any single subscriber fails, all others are stopped. -public class MultiPartitionSubscriber extends TrivialProxyService implements Subscriber { +public class MultiPartitionSubscriber extends ProxyService implements Subscriber { public static Subscriber of(List subscribers) throws ApiException { return new MultiPartitionSubscriber(subscribers); } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java index 21fcc933c..8d1c6f67f 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java @@ -26,13 +26,13 @@ import com.google.cloud.pubsublite.MessageTransformer; import com.google.cloud.pubsublite.cloudpubsub.Publisher; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.TrivialProxyService; +import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.internal.wire.SystemExecutors; import com.google.pubsub.v1.PubsubMessage; // A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant // publisher. It encodes a MessageMetadata object in the response string. -public class WrappingPublisher extends TrivialProxyService implements Publisher { +public class WrappingPublisher extends ProxyService implements Publisher { private final com.google.cloud.pubsublite.internal.Publisher wirePublisher; private final MessageTransformer transformer; diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java new file mode 100644 index 000000000..3869c06d2 --- /dev/null +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java @@ -0,0 +1,45 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.internal; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; +import com.google.cloud.pubsublite.internal.wire.SystemExecutors; + +public final class MoreApiFutures { + private MoreApiFutures() {} + + public static void connectFutures( + ApiFuture source, SettableApiFuture toConnect) { + ApiFutures.addCallback( + source, + new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + toConnect.setException(throwable); + } + + @Override + public void onSuccess(T t) { + toConnect.set(t); + } + }, + SystemExecutors.getFuturesExecutor()); + } +} diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ProxyService.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ProxyService.java index 7b55658cc..8947e6315 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ProxyService.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ProxyService.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.flogger.GoogleLogger; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -41,7 +42,13 @@ public abstract class ProxyService extends AbstractApiService { private final List services = new ArrayList<>(); private final AtomicBoolean stoppedOrFailed = new AtomicBoolean(false); - protected ProxyService() {} + protected ProxyService(Collection services) { + addServices(services); + } + + protected ProxyService(ApiService... services) throws ApiException { + this(Arrays.asList(services)); + } // Add a new ApiServices to this. Requires that all of them are in state NEW and this is in state // NEW. @@ -59,13 +66,13 @@ protected final void addServices(ApiService... services) throws ApiException { } // Method to be called on service start after dependent services start. - protected abstract void start() throws CheckedApiException; + protected void start() throws CheckedApiException {} // Method to be called on service stop before dependent services stop. - protected abstract void stop() throws CheckedApiException; + protected void stop() throws CheckedApiException {} // Method to be called for class-specific permanent error handling after trying to stop all other // services. May not throw. - protected abstract void handlePermanentError(CheckedApiException error); + protected void handlePermanentError(CheckedApiException error) {} // Tries to stop all dependent services and sets this service into the FAILED state. protected final void onPermanentError(CheckedApiException error) { diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/TrivialProxyService.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/TrivialProxyService.java deleted file mode 100644 index ea8285ac6..000000000 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/TrivialProxyService.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.internal; - -import com.google.api.core.ApiService; -import com.google.api.gax.rpc.ApiException; -import java.util.Arrays; -import java.util.Collection; - -/** A ProxyService that just wraps all ApiService methods. */ -public class TrivialProxyService extends ProxyService { - protected TrivialProxyService(Collection services) throws ApiException { - super(); - addServices(services); - } - - protected TrivialProxyService(ApiService... services) throws ApiException { - this(Arrays.asList(services)); - } - - @Override - protected final void start() {} - - @Override - protected final void stop() {} - - @Override - protected final void handlePermanentError(CheckedApiException error) {} -} diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionCommitter.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionCommitter.java index 30612f0ee..930e9d561 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionCommitter.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionCommitter.java @@ -22,9 +22,9 @@ import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.TrivialProxyService; +import com.google.cloud.pubsublite.internal.ProxyService; -class ApiExceptionCommitter extends TrivialProxyService implements Committer { +class ApiExceptionCommitter extends ProxyService implements Committer { private final Committer committer; ApiExceptionCommitter(Committer committer) throws ApiException { diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionPublisher.java index 30ad6cca4..fe06d066b 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionPublisher.java @@ -21,11 +21,11 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.internal.Publisher; -import com.google.cloud.pubsublite.internal.TrivialProxyService; import java.io.IOException; -public class ApiExceptionPublisher extends TrivialProxyService implements Publisher { +public class ApiExceptionPublisher extends ProxyService implements Publisher { private final Publisher publisher; ApiExceptionPublisher(Publisher publisher) throws ApiException { diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiServiceUtils.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiServiceUtils.java index 40e0bbc1b..c16cef357 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiServiceUtils.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiServiceUtils.java @@ -20,7 +20,6 @@ import com.google.api.core.AbstractApiService; import com.google.api.core.ApiService; -import com.google.api.gax.core.BackgroundResource; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.common.collect.ImmutableList; @@ -31,7 +30,7 @@ public class ApiServiceUtils { private ApiServiceUtils() {} - public static ApiService backgroundResourceAsApiService(BackgroundResource resource) { + public static ApiService autoCloseableAsApiService(AutoCloseable resource) { return new AbstractApiService() { @Override protected void doStart() { diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java index 2606f8fea..ad8161ede 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java @@ -16,13 +16,13 @@ package com.google.cloud.pubsublite.internal.wire; -import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.backgroundResourceAsApiService; +import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.autoCloseableAsApiService; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.CloseableMonitor; -import com.google.cloud.pubsublite.internal.TrivialProxyService; +import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest; import com.google.cloud.pubsublite.proto.PartitionAssignment; import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest; @@ -33,7 +33,7 @@ import java.util.HashSet; import java.util.Set; -public class AssignerImpl extends TrivialProxyService +public class AssignerImpl extends ProxyService implements Assigner, RetryingConnectionObserver { private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); @@ -72,7 +72,7 @@ public AssignerImpl( new ConnectedAssignerImpl.Factory(), initialRequest, receiver); - addServices(backgroundResourceAsApiService(client)); + addServices(autoCloseableAsApiService(client)); } @Override diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitter.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitter.java new file mode 100644 index 000000000..711a58cbd --- /dev/null +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitter.java @@ -0,0 +1,72 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.internal.wire; + +import com.google.api.core.ApiFuture; +import com.google.api.core.SettableApiFuture; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.internal.AlarmFactory; +import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.MoreApiFutures; +import com.google.cloud.pubsublite.internal.ProxyService; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import java.util.Optional; +import java.util.concurrent.Future; + +public class BatchingCommitter extends ProxyService implements Committer { + private final Committer underlying; + + @GuardedBy("this") + private SettableApiFuture currentFuture = SettableApiFuture.create(); + + @GuardedBy("this") + private Optional currentOffset = Optional.empty(); + + BatchingCommitter(Committer underlying, AlarmFactory alarmFactory) { + super(underlying); + this.underlying = underlying; + Future alarm = alarmFactory.newAlarm(this::flush); + addServices(ApiServiceUtils.autoCloseableAsApiService(() -> alarm.cancel(false))); + } + + @Override + public synchronized ApiFuture commitOffset(Offset offset) { + currentOffset = Optional.of(offset); + return currentFuture; + } + + @Override + public void waitUntilEmpty() throws CheckedApiException { + flush(); + underlying.waitUntilEmpty(); + } + + @Override + protected void stop() { + flush(); + } + + private synchronized void flush() { + if (!currentOffset.isPresent()) { + return; + } + ApiFuture underlyingFuture = underlying.commitOffset(currentOffset.get()); + MoreApiFutures.connectFutures(underlyingFuture, currentFuture); + currentOffset = Optional.empty(); + currentFuture = SettableApiFuture.create(); + } +} diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterImpl.java index a55bc3114..36920b34e 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterImpl.java @@ -17,7 +17,7 @@ package com.google.cloud.pubsublite.internal.wire; import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkState; -import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.backgroundResourceAsApiService; +import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.autoCloseableAsApiService; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; @@ -81,7 +81,7 @@ public CommitterImpl(CursorServiceClient client, InitialCommitCursorRequest requ stream -> client.streamingCommitCursorCallable().splitCall(stream), new ConnectedCommitterImpl.Factory(), request); - addServices(backgroundResourceAsApiService(client)); + addServices(autoCloseableAsApiService(client)); } // ProxyService implementation. @@ -94,9 +94,6 @@ protected void handlePermanentError(CheckedApiException error) { } } - @Override - protected void start() {} - @Override protected void stop() { try (CloseableMonitor.Hold h = monitor.enter()) { diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterSettings.java index 45c663163..4342837b6 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterSettings.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterSettings.java @@ -19,8 +19,10 @@ import com.google.auto.value.AutoValue; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.internal.AlarmFactory; import com.google.cloud.pubsublite.proto.InitialCommitCursorRequest; import com.google.cloud.pubsublite.v1.CursorServiceClient; +import java.time.Duration; @AutoValue public abstract class CommitterSettings { @@ -54,6 +56,8 @@ public Committer instantiate() { .setPartition(partition().value()) .build(); return new ApiExceptionCommitter( - new CommitterImpl(serviceClient(), initialCommitCursorRequest)); + new BatchingCommitter( + new CommitterImpl(serviceClient(), initialCommitCursorRequest), + AlarmFactory.create(Duration.ofMillis(50)))); } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java index de394cd16..8481d089a 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java @@ -200,9 +200,6 @@ private void handleConfig(long partitionCount) { } } - @Override - protected void start() {} - @Override protected void stop() { Optional current; diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java index 4e6d373c9..905c11710 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java @@ -17,7 +17,7 @@ package com.google.cloud.pubsublite.internal.wire; import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkState; -import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.backgroundResourceAsApiService; +import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.autoCloseableAsApiService; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; @@ -136,7 +136,7 @@ public PublisherImpl( Objects.requireNonNull(batchingSettings.getDelayThreshold()).toNanos())), initialRequest, batchingSettings); - addServices(backgroundResourceAsApiService(client)); + addServices(autoCloseableAsApiService(client)); } @GuardedBy("monitor.monitor") diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java index ab912202b..966b64b42 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java @@ -26,13 +26,13 @@ import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.internal.RoutingPolicy; -import com.google.cloud.pubsublite.internal.TrivialProxyService; import java.io.IOException; import java.util.Map; -public class RoutingPublisher extends TrivialProxyService implements Publisher { +public class RoutingPublisher extends ProxyService implements Publisher { private final Map> partitionPublishers; private final RoutingPolicy policy; diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java index f30c04b12..9b9880c57 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java @@ -23,12 +23,11 @@ import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.internal.Publisher; -import com.google.cloud.pubsublite.internal.TrivialProxyService; import java.io.IOException; -public class SinglePartitionPublisher extends TrivialProxyService - implements Publisher { +public class SinglePartitionPublisher extends ProxyService implements Publisher { private final Publisher publisher; private final Partition partition; diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java index 73771855a..b3afaabff 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java @@ -17,7 +17,7 @@ package com.google.cloud.pubsublite.internal.wire; import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkArgument; -import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.backgroundResourceAsApiService; +import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.autoCloseableAsApiService; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.SequencedMessage; @@ -107,7 +107,7 @@ public SubscriberImpl( initialLocation, messageConsumer, resetHandler); - addServices(backgroundResourceAsApiService(client)); + addServices(autoCloseableAsApiService(client)); } // ProxyService implementation. diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitterTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitterTest.java new file mode 100644 index 000000000..2601fc379 --- /dev/null +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/BatchingCommitterTest.java @@ -0,0 +1,116 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.internal.wire; + +import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.checkState; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.internal.AlarmFactory; +import com.google.cloud.pubsublite.internal.testing.FakeApiService; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Spy; + +@RunWith(JUnit4.class) +public class BatchingCommitterTest { + abstract static class FakeCommitter extends FakeApiService implements Committer {} + + @Spy FakeCommitter underlying; + @Mock AlarmFactory alarmFactory; + + private BatchingCommitter committer; + + private Runnable flushAlarm; + + @Before + public void setUp() { + initMocks(this); + when(alarmFactory.newAlarm(any())) + .thenAnswer( + args -> { + flushAlarm = args.getArgument(0); + return SettableApiFuture.create(); + }); + committer = new BatchingCommitter(underlying, alarmFactory); + checkState(flushAlarm != null); + committer.startAsync().awaitRunning(); + } + + @Test + public void batchesRequests() throws Exception { + ApiFuture f1 = committer.commitOffset(Offset.of(1)); + ApiFuture f2 = committer.commitOffset(Offset.of(2)); + ApiFuture f3 = committer.commitOffset(Offset.of(3)); + verify(underlying, times(0)).commitOffset(any()); + assertThat(f1.isDone()).isFalse(); + assertThat(f2.isDone()).isFalse(); + assertThat(f3.isDone()).isFalse(); + SettableApiFuture underlyingFuture3 = SettableApiFuture.create(); + when(underlying.commitOffset(Offset.of(3))).thenReturn(underlyingFuture3); + flushAlarm.run(); + verify(underlying, times(1)).commitOffset(Offset.of(3)); + assertThat(f1.isDone()).isFalse(); + assertThat(f2.isDone()).isFalse(); + assertThat(f3.isDone()).isFalse(); + ApiFuture f4 = committer.commitOffset(Offset.of(4)); + SettableApiFuture underlyingFuture4 = SettableApiFuture.create(); + when(underlying.commitOffset(Offset.of(4))).thenReturn(underlyingFuture4); + flushAlarm.run(); + verify(underlying, times(1)).commitOffset(Offset.of(4)); + assertThat(f4.isDone()).isFalse(); + underlyingFuture3.set(null); + f1.get(); + f2.get(); + f3.get(); + assertThat(f4.isDone()).isFalse(); + underlyingFuture4.setException(new Exception("Some error")); + assertThrows(Exception.class, f4::get); + } + + @Test + public void waitUntilEmptyFlushes() throws Exception { + ApiFuture f1 = committer.commitOffset(Offset.of(1)); + verify(underlying, times(0)).commitOffset(any()); + when(underlying.commitOffset(Offset.of(1))).thenReturn(ApiFutures.immediateFuture(null)); + committer.waitUntilEmpty(); + verify(underlying).waitUntilEmpty(); + f1.get(); + } + + @Test + public void shutdownFlushes() throws Exception { + ApiFuture f1 = committer.commitOffset(Offset.of(1)); + verify(underlying, times(0)).commitOffset(any()); + when(underlying.commitOffset(Offset.of(1))).thenReturn(ApiFutures.immediateFuture(null)); + committer.stopAsync().awaitTerminated(); + verify(underlying).commitOffset(Offset.of(1)); + f1.get(); + } +}