diff --git a/pubsublite-beam-io/pom.xml b/pubsublite-beam-io/pom.xml
index 7a33ce1d2..099247964 100644
--- a/pubsublite-beam-io/pom.xml
+++ b/pubsublite-beam-io/pom.xml
@@ -27,8 +27,58 @@
com.google.auto.service
- auto-service
- 1.0-rc7
+ auto-service-annotations
+
+
+ com.google.protobuf
+ protobuf-java-util
+
+
+ com.google.code.findbugs
+ jsr305
+
+
+ com.google.api
+ gax
+
+
+ com.google.errorprone
+ error_prone_annotations
+
+
+ com.google.api
+ api-common
+
+
+ com.google.auto.value
+ auto-value-annotations
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.30
+
+
+ joda-time
+ joda-time
+ 2.10.5
+
+
+ org.checkerframework
+ checker-qual
+ 3.7.0
+
+
+ com.google.guava
+ guava
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+ javax.annotation
+ javax.annotation-api
org.apache.beam
@@ -51,6 +101,12 @@
junit
test
+
+ org.hamcrest
+ hamcrest-core
+ 1.3
+ test
+
org.hamcrest
hamcrest
@@ -74,6 +130,14 @@
true
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+ true
+
+
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/LimitingTopicBacklogReader.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/LimitingTopicBacklogReader.java
index 2e2043249..86e3dace1 100644
--- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/LimitingTopicBacklogReader.java
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/LimitingTopicBacklogReader.java
@@ -17,18 +17,18 @@
package com.google.cloud.pubsublite.beam;
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkNotNull;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import com.google.common.base.Ticker;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
final class LimitingTopicBacklogReader implements TopicBacklogReader {
private final TopicBacklogReader underlying;
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTracker.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTracker.java
index d3b01e351..c720b6b73 100644
--- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTracker.java
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTracker.java
@@ -16,19 +16,19 @@
package com.google.cloud.pubsublite.beam;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
import org.joda.time.Duration;
/**
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java
index c1d8b86db..3c1f97435 100644
--- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java
@@ -17,6 +17,7 @@
package com.google.cloud.pubsublite.beam;
import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.sdk.io.range.OffsetRange;
@@ -69,8 +70,7 @@ public MonotonicallyIncreasing newWatermarkEstimator(@WatermarkEstimatorState In
public ProcessContinuation processElement(
RestrictionTracker tracker,
@Element SubscriptionPartition subscriptionPartition,
- OutputReceiver receiver,
- BundleFinalizer finalizer)
+ OutputReceiver receiver)
throws Exception {
try (SubscriptionPartitionProcessor processor =
processorFactory.newProcessor(subscriptionPartition, tracker, receiver)) {
@@ -79,16 +79,17 @@ public ProcessContinuation processElement(
processor
.lastClaimed()
.ifPresent(
- lastClaimedOffset ->
- finalizer.afterBundleCommit(
- Instant.ofEpochMilli(Long.MAX_VALUE),
- () -> {
- Committer committer = committerFactory.apply(subscriptionPartition);
- committer.startAsync().awaitRunning();
- // Commit the next-to-deliver offset.
- committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get();
- committer.stopAsync().awaitTerminated();
- }));
+ lastClaimedOffset -> {
+ Committer committer = committerFactory.apply(subscriptionPartition);
+ committer.startAsync().awaitRunning();
+ // Commit the next-to-deliver offset.
+ try {
+ committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get();
+ } catch (Exception e) {
+ throw ExtractStatus.toCanonical(e).underlying;
+ }
+ committer.stopAsync().awaitTerminated();
+ });
return result;
}
}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherCache.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherCache.java
index 397a6b893..507484912 100644
--- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherCache.java
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherCache.java
@@ -24,11 +24,11 @@
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.Publisher;
+import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.HashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
/** A map of working publishers by PublisherOptions. */
class PublisherCache {
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Publishers.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Publishers.java
index f8305ea15..ad7e2729b 100644
--- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Publishers.java
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Publishers.java
@@ -36,7 +36,7 @@
import com.google.cloud.pubsublite.v1.AdminServiceSettings;
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken;
+import com.google.common.reflect.TypeToken;
class Publishers {
private static final Framework FRAMEWORK = Framework.of("BEAM");
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteSink.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteSink.java
index 27facddc7..e2011695f 100644
--- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteSink.java
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteSink.java
@@ -29,6 +29,7 @@
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.proto.PubSubMessage;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Deque;
@@ -36,7 +37,6 @@
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
/** A sink which publishes messages to Pub/Sub Lite. */
@SuppressWarnings({
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java
index 46887809e..b9f635f84 100644
--- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java
@@ -27,6 +27,8 @@
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.proto.SequencedMessage;
+import com.google.common.base.Stopwatch;
+import com.google.common.math.LongMath;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -38,8 +40,6 @@
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
import org.joda.time.Duration;
class SubscribeTransform extends PTransform> {
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java
index 6919b0ced..066c331b4 100644
--- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java
@@ -34,11 +34,13 @@
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
+import com.google.cloud.pubsublite.v1.CursorServiceClient;
+import com.google.cloud.pubsublite.v1.CursorServiceSettings;
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
+import com.google.common.collect.ImmutableSet;
import java.io.Serializable;
import java.util.Set;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;
@AutoValue
@@ -127,6 +129,16 @@ SubscriberFactory getSubscriberFactory(Partition partition) {
.build();
}
+ private CursorServiceClient newCursorServiceClient() throws ApiException {
+ try {
+ return CursorServiceClient.create(
+ addDefaultSettings(
+ subscriptionPath().location().region(), CursorServiceSettings.newBuilder()));
+ } catch (Throwable t) {
+ throw toCanonical(t).underlying;
+ }
+ }
+
Committer getCommitter(Partition partition) {
SerializableSupplier supplier = committerSupplier();
if (supplier != null) {
@@ -135,6 +147,7 @@ Committer getCommitter(Partition partition) {
return CommitterSettings.newBuilder()
.setSubscriptionPath(subscriptionPath())
.setPartition(partition)
+ .setServiceClient(newCursorServiceClient())
.build()
.instantiate();
}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionLoader.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionLoader.java
index 4bcab599c..0aec8f47d 100644
--- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionLoader.java
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionLoader.java
@@ -16,12 +16,14 @@
package com.google.cloud.pubsublite.beam;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkArgument;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -36,8 +38,6 @@
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.joda.time.Instant;
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImpl.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImpl.java
index 136fbca1a..2d7a902e2 100644
--- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImpl.java
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImpl.java
@@ -27,6 +27,9 @@
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.util.Timestamps;
import java.util.List;
import java.util.Optional;
@@ -39,9 +42,6 @@
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
import org.joda.time.Duration;
import org.joda.time.Instant;
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderImpl.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderImpl.java
index a3e0c0e28..a8e80b8bc 100644
--- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderImpl.java
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderImpl.java
@@ -17,7 +17,7 @@
package com.google.cloud.pubsublite.beam;
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkNotNull;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java
index 114cc19d7..0fbf8f6d8 100644
--- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java
@@ -16,7 +16,7 @@
package com.google.cloud.pubsublite.beam;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkNotNull;
import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
@@ -28,10 +28,10 @@
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.internal.TopicStatsClientSettings;
+import com.google.common.base.Ticker;
import java.io.Serializable;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nonnull;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
@AutoValue
abstract class TopicBacklogReaderSettings implements Serializable {
diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/AddUuidsTransformTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/AddUuidsTransformTest.java
index f52ac4c40..0abba2190 100644
--- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/AddUuidsTransformTest.java
+++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/AddUuidsTransformTest.java
@@ -17,6 +17,8 @@
package com.google.cloud.pubsublite.beam;
import com.google.cloud.pubsublite.proto.PubSubMessage;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import java.util.HashSet;
import java.util.Set;
@@ -26,8 +28,6 @@
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java
index 47e5ae54c..9db34e184 100644
--- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java
+++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java
@@ -30,11 +30,11 @@
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
import org.joda.time.Duration;
import org.junit.Before;
import org.junit.Test;
diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java
index 5516e07aa..c0980e466 100644
--- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java
+++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java
@@ -24,7 +24,6 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verify;
@@ -44,17 +43,14 @@
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Optional;
-import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.io.range.OffsetRange;
-import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.joda.time.Duration;
-import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -62,7 +58,6 @@
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Spy;
-import org.mockito.stubbing.Answer;
@RunWith(JUnit4.class)
@SuppressWarnings("initialization.fields.uninitialized")
@@ -86,7 +81,6 @@ public class PerSubscriptionPartitionSdfTest {
@Mock InitialOffsetReader initialOffsetReader;
@Spy RestrictionTracker tracker;
@Mock OutputReceiver output;
- @Mock BundleFinalizer finalizer;
@Mock SubscriptionPartitionProcessor processor;
abstract static class FakeCommitter extends FakeApiService implements Committer {}
@@ -147,28 +141,15 @@ public void process() throws Exception {
assertFalse(wrapped.tryClaim(OffsetByteProgress.of(Offset.of(333333), 123)));
return processor;
});
- AtomicReference callbackRef = new AtomicReference<>(null);
- doAnswer(
- (Answer)
- args -> {
- callbackRef.set(args.getArgument(1));
- return null;
- })
- .when(finalizer)
- .afterBundleCommit(any(), any());
doReturn(Optional.of(example(Offset.class))).when(processor).lastClaimed();
- assertEquals(
- ProcessContinuation.resume(), sdf.processElement(tracker, PARTITION, output, finalizer));
+ when(committer.commitOffset(any())).thenReturn(ApiFutures.immediateFuture(null));
+ assertEquals(ProcessContinuation.resume(), sdf.processElement(tracker, PARTITION, output));
verify(processorFactory).newProcessor(eq(PARTITION), any(), eq(output));
InOrder order = inOrder(processor);
order.verify(processor).start();
order.verify(processor).waitForCompletion(MAX_SLEEP_TIME);
order.verify(processor).lastClaimed();
order.verify(processor).close();
- verify(finalizer).afterBundleCommit(eq(Instant.ofEpochMilli(Long.MAX_VALUE)), any());
- assertTrue(callbackRef.get() != null);
- when(committer.commitOffset(any())).thenReturn(ApiFutures.immediateFuture(null));
- callbackRef.get().onBundleSuccess();
InOrder order2 = inOrder(committerFactory, committer);
order2.verify(committer).startAsync();
order2.verify(committer).awaitRunning();
diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImplTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImplTest.java
index cdca6c470..f03cdadb5 100644
--- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImplTest.java
+++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImplTest.java
@@ -41,6 +41,7 @@
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
+import com.google.common.collect.ImmutableList;
import com.google.protobuf.util.Timestamps;
import java.util.List;
import java.util.function.Consumer;
@@ -49,7 +50,6 @@
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/UuidDeduplicationTransformTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/UuidDeduplicationTransformTest.java
index ec05cdca6..493722342 100644
--- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/UuidDeduplicationTransformTest.java
+++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/UuidDeduplicationTransformTest.java
@@ -20,6 +20,8 @@
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.SequencedMessage;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
@@ -28,8 +30,6 @@
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Deduplicate;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;