From 9b4eca11afb5120e1f96ec499eca14f38dcfb42a Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 23 Feb 2021 15:16:29 -0500 Subject: [PATCH 01/13] fix: Fix per-partition SDF to not use BundleFinalizer --- .../beam/PerSubscriptionPartitionSdf.java | 17 +++++++------- .../beam/PerSubscriptionPartitionSdfTest.java | 22 ++----------------- 2 files changed, 11 insertions(+), 28 deletions(-) diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java index c1d8b86db..217bc2fd0 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java @@ -17,6 +17,7 @@ package com.google.cloud.pubsublite.beam; import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.cloud.pubsublite.proto.SequencedMessage; import org.apache.beam.sdk.io.range.OffsetRange; @@ -69,8 +70,7 @@ public MonotonicallyIncreasing newWatermarkEstimator(@WatermarkEstimatorState In public ProcessContinuation processElement( RestrictionTracker tracker, @Element SubscriptionPartition subscriptionPartition, - OutputReceiver receiver, - BundleFinalizer finalizer) + OutputReceiver receiver) throws Exception { try (SubscriptionPartitionProcessor processor = processorFactory.newProcessor(subscriptionPartition, tracker, receiver)) { @@ -79,16 +79,17 @@ public ProcessContinuation processElement( processor .lastClaimed() .ifPresent( - lastClaimedOffset -> - finalizer.afterBundleCommit( - Instant.ofEpochMilli(Long.MAX_VALUE), - () -> { + lastClaimedOffset -> { Committer committer = committerFactory.apply(subscriptionPartition); committer.startAsync().awaitRunning(); // Commit the next-to-deliver offset. - committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get(); + try { + committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get(); + } catch (Exception e) { + throw ExtractStatus.toCanonical(e).underlying; + } committer.stopAsync().awaitTerminated(); - })); + }); return result; } } diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java index 5516e07aa..a596eba57 100644 --- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java +++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.verify; @@ -44,17 +43,14 @@ import java.io.ByteArrayOutputStream; import java.io.ObjectOutputStream; import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; import org.apache.beam.sdk.io.range.OffsetRange; -import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.SerializableBiFunction; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.joda.time.Duration; -import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -62,7 +58,6 @@ import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.Spy; -import org.mockito.stubbing.Answer; @RunWith(JUnit4.class) @SuppressWarnings("initialization.fields.uninitialized") @@ -86,7 +81,6 @@ public class PerSubscriptionPartitionSdfTest { @Mock InitialOffsetReader initialOffsetReader; @Spy RestrictionTracker tracker; @Mock OutputReceiver output; - @Mock BundleFinalizer finalizer; @Mock SubscriptionPartitionProcessor processor; abstract static class FakeCommitter extends FakeApiService implements Committer {} @@ -147,28 +141,16 @@ public void process() throws Exception { assertFalse(wrapped.tryClaim(OffsetByteProgress.of(Offset.of(333333), 123))); return processor; }); - AtomicReference callbackRef = new AtomicReference<>(null); - doAnswer( - (Answer) - args -> { - callbackRef.set(args.getArgument(1)); - return null; - }) - .when(finalizer) - .afterBundleCommit(any(), any()); doReturn(Optional.of(example(Offset.class))).when(processor).lastClaimed(); + when(committer.commitOffset(any())).thenReturn(ApiFutures.immediateFuture(null)); assertEquals( - ProcessContinuation.resume(), sdf.processElement(tracker, PARTITION, output, finalizer)); + ProcessContinuation.resume(), sdf.processElement(tracker, PARTITION, output)); verify(processorFactory).newProcessor(eq(PARTITION), any(), eq(output)); InOrder order = inOrder(processor); order.verify(processor).start(); order.verify(processor).waitForCompletion(MAX_SLEEP_TIME); order.verify(processor).lastClaimed(); order.verify(processor).close(); - verify(finalizer).afterBundleCommit(eq(Instant.ofEpochMilli(Long.MAX_VALUE)), any()); - assertTrue(callbackRef.get() != null); - when(committer.commitOffset(any())).thenReturn(ApiFutures.immediateFuture(null)); - callbackRef.get().onBundleSuccess(); InOrder order2 = inOrder(committerFactory, committer); order2.verify(committer).startAsync(); order2.verify(committer).awaitRunning(); From cbd06f2ed3877e59cf6d753a5d00879f1973f74e Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 23 Feb 2021 15:35:05 -0500 Subject: [PATCH 02/13] feat: reimport --- pubsublite-beam-io/pom.xml | 48 ++++++++++++++++++- .../beam/LimitingTopicBacklogReader.java | 10 ++-- .../beam/OffsetByteRangeTracker.java | 8 ++-- .../cloud/pubsublite/beam/PublisherCache.java | 2 +- .../cloud/pubsublite/beam/Publishers.java | 2 +- .../cloud/pubsublite/beam/PubsubLiteSink.java | 2 +- .../pubsublite/beam/SubscribeTransform.java | 4 +- .../pubsublite/beam/SubscriberOptions.java | 2 +- .../beam/SubscriptionPartitionLoader.java | 6 +-- .../SubscriptionPartitionProcessorImpl.java | 6 +-- .../beam/TopicBacklogReaderImpl.java | 2 +- .../beam/TopicBacklogReaderSettings.java | 4 +- .../beam/AddUuidsTransformTest.java | 4 +- .../beam/OffsetByteRangeTrackerTest.java | 4 +- ...ubscriptionPartitionProcessorImplTest.java | 2 +- .../beam/UuidDeduplicationTransformTest.java | 4 +- 16 files changed, 77 insertions(+), 33 deletions(-) diff --git a/pubsublite-beam-io/pom.xml b/pubsublite-beam-io/pom.xml index 7a33ce1d2..e15d66fcc 100644 --- a/pubsublite-beam-io/pom.xml +++ b/pubsublite-beam-io/pom.xml @@ -27,8 +27,46 @@ com.google.auto.service - auto-service - 1.0-rc7 + auto-service-annotations + + + com.google.protobuf + protobuf-java-util + + + com.google.code.findbugs + jsr305 + + + com.google.api + gax + + + com.google.errorprone + error_prone_annotations + + + com.google.api + api-common + + + com.google.auto.value + auto-value-annotations + + + org.slf4j + slf4j-api + 1.7.30 + + + joda-time + joda-time + 2.10.5 + + + org.checkerframework + checker-qual + 3.7.0 org.apache.beam @@ -51,6 +89,12 @@ junit test + + org.hamcrest + hamcrest-core + 1.3 + test + org.hamcrest hamcrest diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/LimitingTopicBacklogReader.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/LimitingTopicBacklogReader.java index 2e2043249..86e3dace1 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/LimitingTopicBacklogReader.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/LimitingTopicBacklogReader.java @@ -17,18 +17,18 @@ package com.google.cloud.pubsublite.beam; import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkNotNull; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse; +import com.google.common.base.Ticker; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache; final class LimitingTopicBacklogReader implements TopicBacklogReader { private final TopicBacklogReader underlying; diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTracker.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTracker.java index d3b01e351..c720b6b73 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTracker.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTracker.java @@ -16,19 +16,19 @@ package com.google.cloud.pubsublite.beam; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse; +import com.google.common.base.Stopwatch; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch; import org.joda.time.Duration; /** diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherCache.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherCache.java index 397a6b893..507484912 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherCache.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherCache.java @@ -24,11 +24,11 @@ import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.Publisher; +import com.google.common.annotations.VisibleForTesting; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.util.HashMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; /** A map of working publishers by PublisherOptions. */ class PublisherCache { diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Publishers.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Publishers.java index f8305ea15..ad7e2729b 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Publishers.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Publishers.java @@ -36,7 +36,7 @@ import com.google.cloud.pubsublite.v1.AdminServiceSettings; import com.google.cloud.pubsublite.v1.PublisherServiceClient; import com.google.cloud.pubsublite.v1.PublisherServiceSettings; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken; +import com.google.common.reflect.TypeToken; class Publishers { private static final Framework FRAMEWORK = Framework.of("BEAM"); diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteSink.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteSink.java index 27facddc7..e2011695f 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteSink.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteSink.java @@ -29,6 +29,7 @@ import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.proto.PubSubMessage; +import com.google.common.util.concurrent.MoreExecutors; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.util.ArrayDeque; import java.util.Deque; @@ -36,7 +37,6 @@ import java.util.concurrent.Executors; import java.util.function.Consumer; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; /** A sink which publishes messages to Pub/Sub Lite. */ @SuppressWarnings({ diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java index 46887809e..b9f635f84 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java @@ -27,6 +27,8 @@ import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.cloud.pubsublite.internal.wire.Subscriber; import com.google.cloud.pubsublite.proto.SequencedMessage; +import com.google.common.base.Stopwatch; +import com.google.common.math.LongMath; import java.util.List; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -38,8 +40,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath; import org.joda.time.Duration; class SubscribeTransform extends PTransform> { diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java index 6919b0ced..c8f2bcdad 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java @@ -36,9 +36,9 @@ import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; import com.google.cloud.pubsublite.v1.SubscriberServiceClient; import com.google.cloud.pubsublite.v1.SubscriberServiceSettings; +import com.google.common.collect.ImmutableSet; import java.io.Serializable; import java.util.Set; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.Nullable; @AutoValue diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionLoader.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionLoader.java index 4bcab599c..0aec8f47d 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionLoader.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionLoader.java @@ -16,12 +16,14 @@ package com.google.cloud.pubsublite.beam; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkArgument; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.PartitionLookupUtils; import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.TopicPath; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -36,8 +38,6 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.joda.time.Duration; import org.joda.time.Instant; diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImpl.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImpl.java index 136fbca1a..2d7a902e2 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImpl.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImpl.java @@ -27,6 +27,9 @@ import com.google.cloud.pubsublite.proto.FlowControlRequest; import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.proto.SequencedMessage; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.util.Timestamps; import java.util.List; import java.util.Optional; @@ -39,9 +42,6 @@ import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture; import org.joda.time.Duration; import org.joda.time.Instant; diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderImpl.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderImpl.java index a3e0c0e28..a8e80b8bc 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderImpl.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderImpl.java @@ -17,7 +17,7 @@ package com.google.cloud.pubsublite.beam; import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkNotNull; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.Offset; diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java index 114cc19d7..0fbf8f6d8 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java @@ -16,7 +16,7 @@ package com.google.cloud.pubsublite.beam; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkNotNull; import com.google.api.gax.rpc.ApiException; import com.google.auto.value.AutoValue; @@ -28,10 +28,10 @@ import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.cloud.pubsublite.internal.TopicStatsClient; import com.google.cloud.pubsublite.internal.TopicStatsClientSettings; +import com.google.common.base.Ticker; import java.io.Serializable; import java.util.concurrent.ExecutionException; import javax.annotation.Nonnull; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker; @AutoValue abstract class TopicBacklogReaderSettings implements Serializable { diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/AddUuidsTransformTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/AddUuidsTransformTest.java index f52ac4c40..0abba2190 100644 --- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/AddUuidsTransformTest.java +++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/AddUuidsTransformTest.java @@ -17,6 +17,8 @@ package com.google.cloud.pubsublite.beam; import com.google.cloud.pubsublite.proto.PubSubMessage; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import com.google.protobuf.ByteString; import java.util.HashSet; import java.util.Set; @@ -26,8 +28,6 @@ import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java index 47e5ae54c..9db34e184 100644 --- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java +++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java @@ -30,11 +30,11 @@ import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse; +import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker; import org.joda.time.Duration; import org.junit.Before; import org.junit.Test; diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImplTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImplTest.java index cdca6c470..f03cdadb5 100644 --- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImplTest.java +++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImplTest.java @@ -41,6 +41,7 @@ import com.google.cloud.pubsublite.proto.FlowControlRequest; import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.proto.SequencedMessage; +import com.google.common.collect.ImmutableList; import com.google.protobuf.util.Timestamps; import java.util.List; import java.util.function.Consumer; @@ -49,7 +50,6 @@ import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/UuidDeduplicationTransformTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/UuidDeduplicationTransformTest.java index ec05cdca6..493722342 100644 --- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/UuidDeduplicationTransformTest.java +++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/UuidDeduplicationTransformTest.java @@ -20,6 +20,8 @@ import com.google.cloud.pubsublite.proto.Cursor; import com.google.cloud.pubsublite.proto.PubSubMessage; import com.google.cloud.pubsublite.proto.SequencedMessage; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import com.google.protobuf.ByteString; import com.google.protobuf.util.Timestamps; import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; @@ -28,8 +30,6 @@ import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Deduplicate; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Rule; From 2edcdbf5839982fc194ca46490b6de40d0ccefae Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 23 Feb 2021 15:38:15 -0500 Subject: [PATCH 03/13] fix: bundle finalizers --- .../beam/PerSubscriptionPartitionSdf.java | 20 +++++++++---------- .../beam/PerSubscriptionPartitionSdfTest.java | 3 +-- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java index 217bc2fd0..3c1f97435 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java @@ -80,16 +80,16 @@ public ProcessContinuation processElement( .lastClaimed() .ifPresent( lastClaimedOffset -> { - Committer committer = committerFactory.apply(subscriptionPartition); - committer.startAsync().awaitRunning(); - // Commit the next-to-deliver offset. - try { - committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get(); - } catch (Exception e) { - throw ExtractStatus.toCanonical(e).underlying; - } - committer.stopAsync().awaitTerminated(); - }); + Committer committer = committerFactory.apply(subscriptionPartition); + committer.startAsync().awaitRunning(); + // Commit the next-to-deliver offset. + try { + committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get(); + } catch (Exception e) { + throw ExtractStatus.toCanonical(e).underlying; + } + committer.stopAsync().awaitTerminated(); + }); return result; } } diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java index a596eba57..c0980e466 100644 --- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java +++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java @@ -143,8 +143,7 @@ public void process() throws Exception { }); doReturn(Optional.of(example(Offset.class))).when(processor).lastClaimed(); when(committer.commitOffset(any())).thenReturn(ApiFutures.immediateFuture(null)); - assertEquals( - ProcessContinuation.resume(), sdf.processElement(tracker, PARTITION, output)); + assertEquals(ProcessContinuation.resume(), sdf.processElement(tracker, PARTITION, output)); verify(processorFactory).newProcessor(eq(PARTITION), any(), eq(output)); InOrder order = inOrder(processor); order.verify(processor).start(); From a774526692975baf22661069335758583ecc55d0 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 23 Feb 2021 16:08:13 -0500 Subject: [PATCH 04/13] fix: SubscriberOptions --- .../cloud/pubsublite/beam/SubscriberOptions.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java index c8f2bcdad..b46af7d1b 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java @@ -34,6 +34,8 @@ import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder; import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; +import com.google.cloud.pubsublite.v1.CursorServiceClient; +import com.google.cloud.pubsublite.v1.CursorServiceSettings; import com.google.cloud.pubsublite.v1.SubscriberServiceClient; import com.google.cloud.pubsublite.v1.SubscriberServiceSettings; import com.google.common.collect.ImmutableSet; @@ -127,6 +129,17 @@ SubscriberFactory getSubscriberFactory(Partition partition) { .build(); } + private CursorServiceClient newCursorServiceClient() throws ApiException { + try { + return CursorServiceClient.create( + addDefaultSettings( + subscriptionPath().location().region(), + CursorServiceSettings.newBuilder())); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } + } + Committer getCommitter(Partition partition) { SerializableSupplier supplier = committerSupplier(); if (supplier != null) { @@ -135,6 +148,7 @@ Committer getCommitter(Partition partition) { return CommitterSettings.newBuilder() .setSubscriptionPath(subscriptionPath()) .setPartition(partition) + .setServiceClient(newCursorServiceClient()) .build() .instantiate(); } From ae0585fb10e74ed6d7483424cc1dcbd28ab4fed1 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 23 Feb 2021 16:17:54 -0500 Subject: [PATCH 05/13] fix: SubscriberOptions --- .../com/google/cloud/pubsublite/beam/SubscriberOptions.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java index b46af7d1b..066c331b4 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java @@ -133,8 +133,7 @@ private CursorServiceClient newCursorServiceClient() throws ApiException { try { return CursorServiceClient.create( addDefaultSettings( - subscriptionPath().location().region(), - CursorServiceSettings.newBuilder())); + subscriptionPath().location().region(), CursorServiceSettings.newBuilder())); } catch (Throwable t) { throw toCanonical(t).underlying; } From 9f08b7f361d2a4716cd9252f93742f3a77130154 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 23 Feb 2021 16:24:38 -0500 Subject: [PATCH 06/13] fix: pom --- pubsublite-beam-io/pom.xml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/pubsublite-beam-io/pom.xml b/pubsublite-beam-io/pom.xml index e15d66fcc..fdb45b284 100644 --- a/pubsublite-beam-io/pom.xml +++ b/pubsublite-beam-io/pom.xml @@ -68,6 +68,14 @@ checker-qual 3.7.0 + + com.google.guava + guava + + + com.google.protobuf + protobuf-java + org.apache.beam beam-sdks-java-core @@ -118,6 +126,15 @@ true + + org.apache.maven.plugins + maven-dependency-plugin + + + org.apache.beam:beam-runners-direct-java + + + From 975b12634a03ba03216b5eb5d08027dd87212585 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 23 Feb 2021 16:30:29 -0500 Subject: [PATCH 07/13] fix: pom --- pubsublite-beam-io/pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pubsublite-beam-io/pom.xml b/pubsublite-beam-io/pom.xml index fdb45b284..c233502af 100644 --- a/pubsublite-beam-io/pom.xml +++ b/pubsublite-beam-io/pom.xml @@ -76,6 +76,10 @@ com.google.protobuf protobuf-java + + javax.annotation + javax.annotation-api + org.apache.beam beam-sdks-java-core From f9d8d960d29f13f769f9be8b6ae9d9fb82ac412b Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 23 Feb 2021 16:34:35 -0500 Subject: [PATCH 08/13] fix: pom --- pubsublite-beam-io/pom.xml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pubsublite-beam-io/pom.xml b/pubsublite-beam-io/pom.xml index c233502af..68737000a 100644 --- a/pubsublite-beam-io/pom.xml +++ b/pubsublite-beam-io/pom.xml @@ -137,6 +137,9 @@ org.apache.beam:beam-runners-direct-java + + javax.annotation:javax.annotation-api + From 7b0399093caae177db6e7dea357cf6eadedc8529 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 23 Feb 2021 21:21:01 -0500 Subject: [PATCH 09/13] fix: pom --- pubsublite-beam-io/pom.xml | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/pubsublite-beam-io/pom.xml b/pubsublite-beam-io/pom.xml index 68737000a..7f1448322 100644 --- a/pubsublite-beam-io/pom.xml +++ b/pubsublite-beam-io/pom.xml @@ -133,14 +133,22 @@ org.apache.maven.plugins maven-dependency-plugin - - - org.apache.beam:beam-runners-direct-java - - - javax.annotation:javax.annotation-api - - + + + analyze-dep + + analyze-only + + + + org.apache.beam:beam-runners-direct-java + + + javax.annotation:javax.annotation-api + + + + From 9ad969fef2bcca19e9dbd1f5d65abc0b56b4d1bc Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 23 Feb 2021 21:24:36 -0500 Subject: [PATCH 10/13] fix: pom --- pubsublite-beam-io/pom.xml | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pubsublite-beam-io/pom.xml b/pubsublite-beam-io/pom.xml index 7f1448322..11d827426 100644 --- a/pubsublite-beam-io/pom.xml +++ b/pubsublite-beam-io/pom.xml @@ -135,16 +135,18 @@ maven-dependency-plugin - analyze-dep + analyze analyze-only - org.apache.beam:beam-runners-direct-java - - - javax.annotation:javax.annotation-api + + org.apache.beam:beam-runners-direct-java + + + javax.annotation:javax.annotation-api + From 991dcb1ce900498029861c5583378258f5849f70 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 23 Feb 2021 21:27:00 -0500 Subject: [PATCH 11/13] fix: pom --- pubsublite-beam-io/pom.xml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pubsublite-beam-io/pom.xml b/pubsublite-beam-io/pom.xml index 11d827426..2ba05aaef 100644 --- a/pubsublite-beam-io/pom.xml +++ b/pubsublite-beam-io/pom.xml @@ -142,10 +142,7 @@ - org.apache.beam:beam-runners-direct-java - - - javax.annotation:javax.annotation-api + org.apache.beam:beam-runners-direct-java:jar:2.28.0:test From f887d990510ed13b067b8a78e4140fec54f4c251 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 23 Feb 2021 21:51:26 -0500 Subject: [PATCH 12/13] fix: pom --- pubsublite-beam-io/pom.xml | 3 --- 1 file changed, 3 deletions(-) diff --git a/pubsublite-beam-io/pom.xml b/pubsublite-beam-io/pom.xml index 2ba05aaef..7384b2fdb 100644 --- a/pubsublite-beam-io/pom.xml +++ b/pubsublite-beam-io/pom.xml @@ -136,9 +136,6 @@ analyze - - analyze-only - From 8eae0c58135ba062265521be9c26f18479f6eb95 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 23 Feb 2021 21:54:12 -0500 Subject: [PATCH 13/13] fix: pom --- pubsublite-beam-io/pom.xml | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/pubsublite-beam-io/pom.xml b/pubsublite-beam-io/pom.xml index 7384b2fdb..099247964 100644 --- a/pubsublite-beam-io/pom.xml +++ b/pubsublite-beam-io/pom.xml @@ -131,20 +131,12 @@ + org.apache.maven.plugins maven-dependency-plugin - - - analyze - - - - org.apache.beam:beam-runners-direct-java:jar:2.28.0:test - - - - - + + true +