viaRandomKey().withNumBuckets(1000));
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/CloudPubsubChecks.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/CloudPubsubChecks.java
new file mode 100644
index 000000000..5b59e425d
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/CloudPubsubChecks.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.proto.PubSubMessage;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A class providing a conversion validity check between Cloud Pub/Sub and Pub/Sub Lite message
+ * types.
+ */
+public final class CloudPubsubChecks {
+ private CloudPubsubChecks() {}
+
+ /**
+ * Ensure that all messages that pass through can be converted to Cloud Pub/Sub messages using the
+ * standard transformation methods in the client library.
+ *
+ * Will fail the pipeline if a message has multiple attributes per key.
+ */
+ public static PTransform, PCollection>
+ ensureUsableAsCloudPubsub() {
+ return MapElements.into(TypeDescriptor.of(PubSubMessage.class))
+ .via(
+ message -> {
+ Object unused = toCpsPublishTransformer().transform(Message.fromProto(message));
+ return message;
+ });
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/DlqProvider.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/DlqProvider.java
new file mode 100644
index 000000000..fd4033645
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/DlqProvider.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.auto.service.AutoService;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.proto.AttributeValues;
+import com.google.cloud.pubsublite.proto.PubSubMessage;
+import com.google.protobuf.ByteString;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.io.Failure;
+import org.apache.beam.sdk.schemas.io.GenericDlqProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+@Internal
+@AutoService(GenericDlqProvider.class)
+public class DlqProvider implements GenericDlqProvider {
+ @Override
+ public String identifier() {
+ return "pubsublite";
+ }
+
+ @Override
+ public PTransform, PDone> newDlqTransform(String config) {
+ return new DlqTransform(TopicPath.parse(config));
+ }
+
+ private static class DlqTransform extends PTransform, PDone> {
+ private final TopicPath topic;
+
+ DlqTransform(TopicPath topic) {
+ this.topic = topic;
+ }
+
+ @Override
+ public PDone expand(PCollection input) {
+ return input
+ .apply(
+ "Failure to PubSubMessage",
+ MapElements.into(TypeDescriptor.of(PubSubMessage.class))
+ .via(DlqTransform::getMessage))
+ .apply(
+ "Write Failures to Pub/Sub Lite",
+ PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topic).build()));
+ }
+
+ private static PubSubMessage getMessage(Failure failure) {
+ PubSubMessage.Builder builder = PubSubMessage.newBuilder();
+ builder.putAttributes(
+ "beam-dlq-error",
+ AttributeValues.newBuilder()
+ .addValues(ByteString.copyFromUtf8(failure.getError()))
+ .build());
+ builder.setData(ByteString.copyFrom(failure.getPayload()));
+ return builder.build();
+ }
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/InitialOffsetReader.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/InitialOffsetReader.java
new file mode 100644
index 000000000..1832c93eb
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/InitialOffsetReader.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.Offset;
+
+interface InitialOffsetReader extends AutoCloseable {
+ Offset read() throws ApiException;
+
+ @Override
+ void close();
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/InitialOffsetReaderImpl.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/InitialOffsetReaderImpl.java
new file mode 100644
index 000000000..eea174738
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/InitialOffsetReaderImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.internal.CursorClient;
+import java.util.Map;
+
+class InitialOffsetReaderImpl implements InitialOffsetReader {
+ private final CursorClient client;
+ private final SubscriptionPath subscription;
+ private final Partition partition;
+
+ InitialOffsetReaderImpl(CursorClient client, SubscriptionPath subscription, Partition partition) {
+ this.client = client;
+ this.subscription = subscription;
+ this.partition = partition;
+ }
+
+ @Override
+ public Offset read() throws ApiException {
+ try {
+ Map results = client.listPartitionCursors(subscription).get();
+ return results.getOrDefault(partition, Offset.of(0));
+ } catch (Throwable t) {
+ throw toCanonical(t).underlying;
+ }
+ }
+
+ @Override
+ public void close() {
+ client.close();
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/LimitingTopicBacklogReader.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/LimitingTopicBacklogReader.java
new file mode 100644
index 000000000..2e2043249
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/LimitingTopicBacklogReader.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
+
+final class LimitingTopicBacklogReader implements TopicBacklogReader {
+ private final TopicBacklogReader underlying;
+ private final LoadingCache backlogCache;
+
+ @GuardedBy("this")
+ @Nullable
+ private Offset currentRequestOffset = null;
+
+ @SuppressWarnings("method.invocation.invalid")
+ LimitingTopicBacklogReader(TopicBacklogReader underlying, Ticker ticker) {
+ this.underlying = underlying;
+ backlogCache =
+ CacheBuilder.newBuilder()
+ .ticker(ticker)
+ .maximumSize(1)
+ .expireAfterWrite(1, TimeUnit.MINUTES)
+ .refreshAfterWrite(10, TimeUnit.SECONDS)
+ .build(
+ new CacheLoader() {
+ @Override
+ public ComputeMessageStatsResponse load(String val) {
+ return loadFromUnderlying();
+ }
+ });
+ }
+
+ @SuppressWarnings("argument.type.incompatible")
+ private synchronized ComputeMessageStatsResponse loadFromUnderlying() {
+ return underlying.computeMessageStats(checkNotNull(currentRequestOffset));
+ }
+
+ @Override
+ public synchronized ComputeMessageStatsResponse computeMessageStats(Offset offset)
+ throws ApiException {
+ currentRequestOffset = offset;
+ try {
+ // There is only a single entry in the cache.
+ return backlogCache.get("cache");
+ } catch (Throwable t) {
+ throw toCanonical(t).underlying;
+ }
+ }
+
+ @Override
+ public void close() {
+ underlying.close();
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteProgress.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteProgress.java
new file mode 100644
index 000000000..8fa88a5af
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteProgress.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.pubsublite.Offset;
+
+/** A representation of progress through a Pub/Sub lite partition. */
+@AutoValue
+abstract class OffsetByteProgress {
+ static OffsetByteProgress of(Offset lastOffset, long batchBytes) {
+ return new AutoValue_OffsetByteProgress(lastOffset, batchBytes);
+ }
+ /** The last offset of the messages received. */
+ abstract Offset lastOffset();
+
+ abstract long batchBytes();
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTracker.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTracker.java
new file mode 100644
index 000000000..d3b01e351
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTracker.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
+import org.joda.time.Duration;
+
+/**
+ * OffsetByteRangeTracker is an unbounded restriction tracker for Pub/Sub lite partitions that
+ * tracks offsets for checkpointing and bytes for progress.
+ *
+ * Any valid instance of an OffsetByteRangeTracker tracks one of exactly two types of ranges: -
+ * Unbounded ranges whose last offset is Long.MAX_VALUE - Completed ranges that are either empty
+ * (From == To) or fully claimed (lastClaimed == To - 1)
+ *
+ *
Also prevents splitting until minTrackingTime has passed or minBytesReceived have been
+ * received. IMPORTANT: minTrackingTime must be strictly smaller than the SDF read timeout when it
+ * would return ProcessContinuation.resume().
+ */
+class OffsetByteRangeTracker extends RestrictionTracker
+ implements HasProgress {
+ private final TopicBacklogReader backlogReader;
+ private final Duration minTrackingTime;
+ private final long minBytesReceived;
+ private final Stopwatch stopwatch;
+ private OffsetRange range;
+ private @Nullable Long lastClaimed;
+ private long byteCount = 0;
+
+ public OffsetByteRangeTracker(
+ OffsetRange range,
+ TopicBacklogReader backlogReader,
+ Stopwatch stopwatch,
+ Duration minTrackingTime,
+ long minBytesReceived) {
+ checkArgument(range.getTo() == Long.MAX_VALUE);
+ this.backlogReader = backlogReader;
+ this.minTrackingTime = minTrackingTime;
+ this.minBytesReceived = minBytesReceived;
+ this.stopwatch = stopwatch.reset().start();
+ this.range = range;
+ }
+
+ @Override
+ public void finalize() {
+ this.backlogReader.close();
+ }
+
+ @Override
+ public IsBounded isBounded() {
+ return IsBounded.UNBOUNDED;
+ }
+
+ @Override
+ public boolean tryClaim(OffsetByteProgress position) {
+ long toClaim = position.lastOffset().value();
+ checkArgument(
+ lastClaimed == null || toClaim > lastClaimed,
+ "Trying to claim offset %s while last attempted was %s",
+ position.lastOffset().value(),
+ lastClaimed);
+ checkArgument(
+ toClaim >= range.getFrom(),
+ "Trying to claim offset %s before start of the range %s",
+ toClaim,
+ range);
+ // split() has already been called, truncating this range. No more offsets may be claimed.
+ if (range.getTo() != Long.MAX_VALUE) {
+ boolean isRangeEmpty = range.getTo() == range.getFrom();
+ boolean isValidClosedRange = nextOffset() == range.getTo();
+ checkState(
+ isRangeEmpty || isValidClosedRange,
+ "Violated class precondition: offset range improperly split. Please report a beam bug.");
+ return false;
+ }
+ lastClaimed = toClaim;
+ byteCount += position.batchBytes();
+ return true;
+ }
+
+ @Override
+ public OffsetRange currentRestriction() {
+ return range;
+ }
+
+ private long nextOffset() {
+ checkState(lastClaimed == null || lastClaimed < Long.MAX_VALUE);
+ return lastClaimed == null ? currentRestriction().getFrom() : lastClaimed + 1;
+ }
+
+ /**
+ * Whether the tracker has received enough data/been running for enough time that it can
+ * checkpoint and be confident it can get sufficient throughput.
+ */
+ private boolean receivedEnough() {
+ Duration duration = Duration.millis(stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ if (duration.isLongerThan(minTrackingTime)) {
+ return true;
+ }
+ if (byteCount >= minBytesReceived) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public @Nullable SplitResult trySplit(double fractionOfRemainder) {
+ // Cannot split a bounded range. This should already be completely claimed.
+ if (range.getTo() != Long.MAX_VALUE) {
+ return null;
+ }
+ if (!receivedEnough()) {
+ return null;
+ }
+ range = new OffsetRange(currentRestriction().getFrom(), nextOffset());
+ return SplitResult.of(this.range, new OffsetRange(nextOffset(), Long.MAX_VALUE));
+ }
+
+ @Override
+ @SuppressWarnings("unboxing.of.nullable")
+ public void checkDone() throws IllegalStateException {
+ if (range.getFrom() == range.getTo()) {
+ return;
+ }
+ checkState(
+ lastClaimed != null,
+ "Last attempted offset should not be null. No work was claimed in non-empty range %s.",
+ range);
+ long lastClaimedNotNull = checkNotNull(lastClaimed);
+ checkState(
+ lastClaimedNotNull >= range.getTo() - 1,
+ "Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted",
+ lastClaimedNotNull,
+ range,
+ lastClaimedNotNull + 1,
+ range.getTo());
+ }
+
+ @Override
+ public Progress getProgress() {
+ ComputeMessageStatsResponse stats =
+ this.backlogReader.computeMessageStats(Offset.of(nextOffset()));
+ return Progress.from(byteCount, stats.getMessageBytes());
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerServerPublisherCache.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerServerPublisherCache.java
new file mode 100644
index 000000000..1b2a3f4c4
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerServerPublisherCache.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+/**
+ * A shared cache per-worker instance of Pub/Sub Lite publishers.
+ *
+ * Pub/Sub Lite publishers connect to all available partitions: it would be a pessimization for
+ * all instances of the PubsubLiteSink to do this.
+ */
+final class PerServerPublisherCache {
+ private PerServerPublisherCache() {}
+
+ static final PublisherCache PUBLISHER_CACHE = new PublisherCache();
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java
new file mode 100644
index 000000000..c1d8b86db
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.internal.wire.Committer;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+class PerSubscriptionPartitionSdf extends DoFn {
+ private final Duration maxSleepTime;
+ private final SubscriptionPartitionProcessorFactory processorFactory;
+ private final SerializableFunction
+ offsetReaderFactory;
+ private final SerializableBiFunction<
+ SubscriptionPartition, OffsetRange, RestrictionTracker>
+ trackerFactory;
+ private final SerializableFunction committerFactory;
+
+ PerSubscriptionPartitionSdf(
+ Duration maxSleepTime,
+ SerializableFunction offsetReaderFactory,
+ SerializableBiFunction<
+ SubscriptionPartition,
+ OffsetRange,
+ RestrictionTracker>
+ trackerFactory,
+ SubscriptionPartitionProcessorFactory processorFactory,
+ SerializableFunction committerFactory) {
+ this.maxSleepTime = maxSleepTime;
+ this.processorFactory = processorFactory;
+ this.offsetReaderFactory = offsetReaderFactory;
+ this.trackerFactory = trackerFactory;
+ this.committerFactory = committerFactory;
+ }
+
+ @GetInitialWatermarkEstimatorState
+ public Instant getInitialWatermarkState() {
+ return Instant.EPOCH;
+ }
+
+ @NewWatermarkEstimator
+ public MonotonicallyIncreasing newWatermarkEstimator(@WatermarkEstimatorState Instant state) {
+ return new MonotonicallyIncreasing(state);
+ }
+
+ @ProcessElement
+ public ProcessContinuation processElement(
+ RestrictionTracker tracker,
+ @Element SubscriptionPartition subscriptionPartition,
+ OutputReceiver receiver,
+ BundleFinalizer finalizer)
+ throws Exception {
+ try (SubscriptionPartitionProcessor processor =
+ processorFactory.newProcessor(subscriptionPartition, tracker, receiver)) {
+ processor.start();
+ ProcessContinuation result = processor.waitForCompletion(maxSleepTime);
+ processor
+ .lastClaimed()
+ .ifPresent(
+ lastClaimedOffset ->
+ finalizer.afterBundleCommit(
+ Instant.ofEpochMilli(Long.MAX_VALUE),
+ () -> {
+ Committer committer = committerFactory.apply(subscriptionPartition);
+ committer.startAsync().awaitRunning();
+ // Commit the next-to-deliver offset.
+ committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get();
+ committer.stopAsync().awaitTerminated();
+ }));
+ return result;
+ }
+ }
+
+ @GetInitialRestriction
+ public OffsetRange getInitialRestriction(@Element SubscriptionPartition subscriptionPartition) {
+ try (InitialOffsetReader reader = offsetReaderFactory.apply(subscriptionPartition)) {
+ Offset offset = reader.read();
+ return new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */);
+ }
+ }
+
+ @NewTracker
+ public RestrictionTracker newTracker(
+ @Element SubscriptionPartition subscriptionPartition, @Restriction OffsetRange range) {
+ return trackerFactory.apply(subscriptionPartition, range);
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherCache.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherCache.java
new file mode 100644
index 000000000..397a6b893
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherCache.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.checkArgument;
+
+import com.google.api.core.ApiService.Listener;
+import com.google.api.core.ApiService.State;
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.MessageMetadata;
+import com.google.cloud.pubsublite.internal.CloseableMonitor;
+import com.google.cloud.pubsublite.internal.Publisher;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import java.util.HashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+
+/** A map of working publishers by PublisherOptions. */
+class PublisherCache {
+ private final CloseableMonitor monitor = new CloseableMonitor();
+
+ private final Executor listenerExecutor = Executors.newSingleThreadExecutor();
+
+ @GuardedBy("monitor.monitor")
+ private final HashMap> livePublishers =
+ new HashMap<>();
+
+ Publisher get(PublisherOptions options) throws ApiException {
+ checkArgument(options.usesCache());
+ try (CloseableMonitor.Hold h = monitor.enter()) {
+ Publisher publisher = livePublishers.get(options);
+ if (publisher != null) {
+ return publisher;
+ }
+ publisher = Publishers.newPublisher(options);
+ livePublishers.put(options, publisher);
+ publisher.addListener(
+ new Listener() {
+ @Override
+ public void failed(State s, Throwable t) {
+ try (CloseableMonitor.Hold h = monitor.enter()) {
+ livePublishers.remove(options);
+ }
+ }
+ },
+ listenerExecutor);
+ publisher.startAsync().awaitRunning();
+ return publisher;
+ }
+ }
+
+ @VisibleForTesting
+ void set(PublisherOptions options, Publisher toCache) {
+ try (CloseableMonitor.Hold h = monitor.enter()) {
+ livePublishers.put(options, toCache);
+ }
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherOptions.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherOptions.java
new file mode 100644
index 000000000..19c114eda
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherOptions.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.pubsublite.TopicPath;
+import java.io.Serializable;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Options needed for a Pub/Sub Lite Publisher. */
+@AutoValue
+public abstract class PublisherOptions implements Serializable {
+ private static final long serialVersionUID = 275311613L;
+
+ // Required parameters.
+ public abstract TopicPath topicPath();
+
+ // Optional parameters.
+ /**
+ * A supplier for the publisher to be used. If enabled, does not use the publisher cache.
+ *
+ * The returned type must be convertible to Publisher, but Object is used to
+ * prevent adding an api surface dependency on guava when this is not used.
+ */
+ public abstract @Nullable SerializableSupplier publisherSupplier();
+
+ @Override
+ public abstract int hashCode();
+
+ public static Builder newBuilder() {
+ return new AutoValue_PublisherOptions.Builder();
+ }
+
+ public boolean usesCache() {
+ return publisherSupplier() == null;
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ // Required parameters.
+ public abstract Builder setTopicPath(TopicPath path);
+
+ // Optional parameters.
+ /**
+ * A supplier for the publisher to be used. If enabled, does not use the publisher cache.
+ *
+ * The returned type must be convertible to Publisher, but Object is used to
+ * prevent adding an api surface dependency on guava when this is not used.
+ */
+ public abstract Builder setPublisherSupplier(SerializableSupplier stubSupplier);
+
+ public abstract PublisherOptions build();
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherOrError.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherOrError.java
new file mode 100644
index 000000000..b8d88aa9b
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherOrError.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.auto.value.AutoOneOf;
+import com.google.cloud.pubsublite.MessageMetadata;
+import com.google.cloud.pubsublite.internal.CheckedApiException;
+import com.google.cloud.pubsublite.internal.Publisher;
+
+/** A helper representing either a Publisher or an error. */
+@AutoOneOf(PublisherOrError.Kind.class)
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+abstract class PublisherOrError {
+ enum Kind {
+ PUBLISHER,
+ ERROR
+ }
+
+ abstract Kind getKind();
+
+ abstract Publisher publisher();
+
+ abstract CheckedApiException error();
+
+ static PublisherOrError ofPublisher(Publisher p) {
+ return AutoOneOf_PublisherOrError.publisher(p);
+ }
+
+ static PublisherOrError ofError(CheckedApiException e) {
+ return AutoOneOf_PublisherOrError.error(e);
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Publishers.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Publishers.java
new file mode 100644
index 000000000..f8305ea15
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Publishers.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
+import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.checkArgument;
+import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
+import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.AdminClient;
+import com.google.cloud.pubsublite.AdminClientSettings;
+import com.google.cloud.pubsublite.MessageMetadata;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.internal.Publisher;
+import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
+import com.google.cloud.pubsublite.internal.wire.PubsubContext;
+import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
+import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
+import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
+import com.google.cloud.pubsublite.v1.AdminServiceClient;
+import com.google.cloud.pubsublite.v1.AdminServiceSettings;
+import com.google.cloud.pubsublite.v1.PublisherServiceClient;
+import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken;
+
+class Publishers {
+ private static final Framework FRAMEWORK = Framework.of("BEAM");
+
+ private Publishers() {}
+
+ private static AdminClient newAdminClient(PublisherOptions options) throws ApiException {
+ try {
+ return AdminClient.create(
+ AdminClientSettings.newBuilder()
+ .setServiceClient(
+ AdminServiceClient.create(
+ addDefaultSettings(
+ options.topicPath().location().region(),
+ AdminServiceSettings.newBuilder())))
+ .setRegion(options.topicPath().location().region())
+ .build());
+ } catch (Throwable t) {
+ throw toCanonical(t).underlying;
+ }
+ }
+
+ private static PublisherServiceClient newServiceClient(
+ PublisherOptions options, Partition partition) {
+ PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder();
+ settingsBuilder =
+ addDefaultMetadata(
+ PubsubContext.of(FRAMEWORK),
+ RoutingMetadata.of(options.topicPath(), partition),
+ settingsBuilder);
+ try {
+ return PublisherServiceClient.create(
+ addDefaultSettings(options.topicPath().location().region(), settingsBuilder));
+ } catch (Throwable t) {
+ throw toCanonical(t).underlying;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ static Publisher newPublisher(PublisherOptions options) throws ApiException {
+ SerializableSupplier supplier = options.publisherSupplier();
+ if (supplier != null) {
+ Object supplied = supplier.get();
+ TypeToken> token = new TypeToken>() {};
+ checkArgument(token.isSupertypeOf(supplied.getClass()));
+ return (Publisher) supplied;
+ }
+ return PartitionCountWatchingPublisherSettings.newBuilder()
+ .setTopic(options.topicPath())
+ .setPublisherFactory(
+ partition ->
+ SinglePartitionPublisherBuilder.newBuilder()
+ .setTopic(options.topicPath())
+ .setPartition(partition)
+ .setServiceClient(newServiceClient(options, partition))
+ .build())
+ .setAdminClient(newAdminClient(options))
+ .build()
+ .instantiate();
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteIO.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteIO.java
new file mode 100644
index 000000000..35d33c443
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteIO.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.cloud.pubsublite.proto.PubSubMessage;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * I/O transforms for reading from Google Pub/Sub Lite.
+ *
+ * For the differences between this and Google Pub/Sub, please refer to the product documentation .
+ */
+@Experimental
+public final class PubsubLiteIO {
+ private PubsubLiteIO() {}
+
+ /**
+ * Read messages from Pub/Sub Lite. These messages may contain duplicates if the publisher
+ * retried, which the PubsubLiteIO write method will do. Use the dedupe transform to remove these
+ * duplicates.
+ *
+ *
{@code
+ * Pipeline p = ...;
+ *
+ * SubscriptionPath subscriptionPath =
+ * SubscriptionPath.newBuilder()
+ * .setLocation(zone)
+ * .setProjectNumber(projectNum)
+ * .setName(subscriptionName)
+ * .build();
+ *
+ * PCollection messages = p.apply(PubsubLiteIO.read(SubscriberOptions.newBuilder()
+ * .setSubscriptionPath(subscriptionPath)
+ * .build()), "read");
+ * }
+ */
+ public static PTransform> read(SubscriberOptions options) {
+ return new SubscribeTransform(options);
+ }
+
+ /**
+ * Remove duplicates from the PTransform from a read. Assumes by default that the uuids were added
+ * by a call to PubsubLiteIO.addUuids() when published.
+ *
+ * {@code
+ * PCollection messages = ... (above) ...;
+ * messages = messages.apply(PubsubLiteIO.deduplicate(
+ * UuidDeduplicationOptions.newBuilder().build()));
+ *
+ * }
+ */
+ public static PTransform, PCollection>
+ deduplicate(UuidDeduplicationOptions options) {
+ return new UuidDeduplicationTransform(options);
+ }
+
+ /**
+ * Add Uuids to to-be-published messages that ensures that uniqueness is maintained.
+ *
+ * {@code
+ * PCollection messages = ...;
+ * messages = messages.apply(PubsubLiteIO.addUuids());
+ *
+ * }
+ */
+ public static PTransform, PCollection> addUuids() {
+ return new AddUuidsTransform();
+ }
+
+ /**
+ * Write messages to Pub/Sub Lite.
+ *
+ * {@code
+ * TopicPath topicPath =
+ * TopicPath.newBuilder()
+ * .setProjectNumber(projectNum)
+ * .setLocation(zone)
+ * .setName(topicName)
+ * .build();
+ *
+ * PCollection messages = ...;
+ * messages.apply(PubsubLiteIO.write(
+ * PublisherOptions.newBuilder().setTopicPath(topicPath).build());
+ *
+ * }
+ */
+ public static PTransform, PDone> write(PublisherOptions options) {
+ return new PTransform, PDone>("PubsubLiteIO") {
+ @Override
+ public PDone expand(PCollection input) {
+ PubsubLiteSink sink = new PubsubLiteSink(options);
+ input.apply("Write", ParDo.of(sink));
+ return PDone.in(input.getPipeline());
+ }
+ };
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteSink.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteSink.java
new file mode 100644
index 000000000..27facddc7
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteSink.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.ApiService.Listener;
+import com.google.api.core.ApiService.State;
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.MessageMetadata;
+import com.google.cloud.pubsublite.beam.PublisherOrError.Kind;
+import com.google.cloud.pubsublite.internal.CheckedApiException;
+import com.google.cloud.pubsublite.internal.ExtractStatus;
+import com.google.cloud.pubsublite.internal.Publisher;
+import com.google.cloud.pubsublite.proto.PubSubMessage;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+
+/** A sink which publishes messages to Pub/Sub Lite. */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+class PubsubLiteSink extends DoFn {
+ private final PublisherOptions options;
+
+ @GuardedBy("this")
+ private transient PublisherOrError publisherOrError;
+
+ // Whenever outstanding is decremented, notify() must be called.
+ @GuardedBy("this")
+ private transient int outstanding;
+
+ @GuardedBy("this")
+ private transient Deque errorsSinceLastFinish;
+
+ private static final Executor executor = Executors.newCachedThreadPool();
+
+ PubsubLiteSink(PublisherOptions options) {
+ this.options = options;
+ }
+
+ @Setup
+ public void setup() throws ApiException {
+ Publisher publisher;
+ if (options.usesCache()) {
+ publisher = PerServerPublisherCache.PUBLISHER_CACHE.get(options);
+ } else {
+ publisher = Publishers.newPublisher(options);
+ }
+ synchronized (this) {
+ outstanding = 0;
+ errorsSinceLastFinish = new ArrayDeque<>();
+ publisherOrError = PublisherOrError.ofPublisher(publisher);
+ }
+ // cannot declare in inner class since 'this' means something different.
+ Consumer onFailure =
+ t -> {
+ synchronized (this) {
+ publisherOrError = PublisherOrError.ofError(ExtractStatus.toCanonical(t));
+ }
+ };
+ publisher.addListener(
+ new Listener() {
+ @Override
+ public void failed(State s, Throwable t) {
+ onFailure.accept(t);
+ }
+ },
+ MoreExecutors.directExecutor());
+ if (!options.usesCache()) {
+ publisher.startAsync();
+ }
+ }
+
+ private synchronized void decrementOutstanding() {
+ --outstanding;
+ notify();
+ }
+
+ @ProcessElement
+ public synchronized void processElement(@Element PubSubMessage message)
+ throws CheckedApiException {
+ ++outstanding;
+ if (publisherOrError.getKind() == Kind.ERROR) {
+ throw publisherOrError.error();
+ }
+ ApiFuture future =
+ publisherOrError.publisher().publish(Message.fromProto(message));
+ // cannot declare in inner class since 'this' means something different.
+ Consumer onFailure =
+ t -> {
+ synchronized (this) {
+ decrementOutstanding();
+ errorsSinceLastFinish.push(ExtractStatus.toCanonical(t));
+ }
+ };
+ ApiFutures.addCallback(
+ future,
+ new ApiFutureCallback() {
+ @Override
+ public void onSuccess(MessageMetadata MessageMetadata) {
+ decrementOutstanding();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ onFailure.accept(t);
+ }
+ },
+ executor);
+ }
+
+ // Intentionally don't flush on bundle finish to allow multi-sink client reuse.
+ @FinishBundle
+ public synchronized void finishBundle() throws CheckedApiException, InterruptedException {
+ while (outstanding > 0) {
+ wait();
+ }
+ if (!errorsSinceLastFinish.isEmpty()) {
+ CheckedApiException canonical = errorsSinceLastFinish.pop();
+ while (!errorsSinceLastFinish.isEmpty()) {
+ canonical.addSuppressed(errorsSinceLastFinish.pop());
+ }
+ throw canonical;
+ }
+ if (publisherOrError.getKind() == Kind.ERROR) {
+ throw publisherOrError.error();
+ }
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SerializableSubscriberFactory.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SerializableSubscriberFactory.java
new file mode 100644
index 000000000..f84401325
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SerializableSubscriberFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.internal.wire.Subscriber;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import java.io.Serializable;
+import java.util.List;
+import java.util.function.Consumer;
+
+interface SerializableSubscriberFactory extends Serializable {
+ long serialVersionUID = -6978345654136456L;
+
+ Subscriber newSubscriber(Partition partition, Consumer> messageConsumer)
+ throws ApiException;
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SerializableSupplier.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SerializableSupplier.java
new file mode 100644
index 000000000..f90053cae
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SerializableSupplier.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import java.io.Serializable;
+
+/** A serializable Supplier. */
+public interface SerializableSupplier extends Serializable {
+ T get();
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java
new file mode 100644
index 000000000..46887809e
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
+import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.checkArgument;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.AdminClient;
+import com.google.cloud.pubsublite.AdminClientSettings;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.internal.wire.Committer;
+import com.google.cloud.pubsublite.internal.wire.Subscriber;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
+import org.joda.time.Duration;
+
+class SubscribeTransform extends PTransform> {
+ private static final Duration MAX_SLEEP_TIME = Duration.standardMinutes(1);
+
+ private final SubscriberOptions options;
+
+ SubscribeTransform(SubscriberOptions options) {
+ this.options = options;
+ }
+
+ private void checkSubscription(SubscriptionPartition subscriptionPartition) throws ApiException {
+ checkArgument(subscriptionPartition.subscription().equals(options.subscriptionPath()));
+ }
+
+ private Subscriber newSubscriber(Partition partition, Consumer> consumer) {
+ try {
+ return options
+ .getSubscriberFactory(partition)
+ .newSubscriber(
+ messages ->
+ consumer.accept(
+ messages.stream()
+ .map(message -> message.toProto())
+ .collect(Collectors.toList())));
+ } catch (Throwable t) {
+ throw toCanonical(t).underlying;
+ }
+ }
+
+ private SubscriptionPartitionProcessor newPartitionProcessor(
+ SubscriptionPartition subscriptionPartition,
+ RestrictionTracker tracker,
+ OutputReceiver receiver)
+ throws ApiException {
+ checkSubscription(subscriptionPartition);
+ return new SubscriptionPartitionProcessorImpl(
+ tracker,
+ receiver,
+ consumer -> newSubscriber(subscriptionPartition.partition(), consumer),
+ options.flowControlSettings());
+ }
+
+ private RestrictionTracker newRestrictionTracker(
+ SubscriptionPartition subscriptionPartition, OffsetRange initial) {
+ checkSubscription(subscriptionPartition);
+ return new OffsetByteRangeTracker(
+ initial,
+ options.getBacklogReader(subscriptionPartition.partition()),
+ Stopwatch.createUnstarted(),
+ MAX_SLEEP_TIME.multipliedBy(3).dividedBy(4),
+ LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10));
+ }
+
+ private InitialOffsetReader newInitialOffsetReader(SubscriptionPartition subscriptionPartition) {
+ checkSubscription(subscriptionPartition);
+ return options.getInitialOffsetReader(subscriptionPartition.partition());
+ }
+
+ private Committer newCommitter(SubscriptionPartition subscriptionPartition) {
+ checkSubscription(subscriptionPartition);
+ return options.getCommitter(subscriptionPartition.partition());
+ }
+
+ private TopicPath getTopicPath() {
+ try (AdminClient admin =
+ AdminClient.create(
+ AdminClientSettings.newBuilder()
+ .setRegion(options.subscriptionPath().location().region())
+ .build())) {
+ return TopicPath.parse(admin.getSubscription(options.subscriptionPath()).get().getTopic());
+ } catch (Throwable t) {
+ throw toCanonical(t).underlying;
+ }
+ }
+
+ @Override
+ public PCollection expand(PBegin input) {
+ PCollection subscriptionPartitions;
+ if (options.partitions().isEmpty()) {
+ subscriptionPartitions =
+ input.apply(new SubscriptionPartitionLoader(getTopicPath(), options.subscriptionPath()));
+ } else {
+ subscriptionPartitions =
+ input.apply(
+ Create.of(
+ options.partitions().stream()
+ .map(
+ partition ->
+ SubscriptionPartition.of(options.subscriptionPath(), partition))
+ .collect(Collectors.toList())));
+ }
+
+ return subscriptionPartitions.apply(
+ ParDo.of(
+ new PerSubscriptionPartitionSdf(
+ MAX_SLEEP_TIME,
+ this::newInitialOffsetReader,
+ this::newRestrictionTracker,
+ this::newPartitionProcessor,
+ this::newCommitter)));
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java
new file mode 100644
index 000000000..6919b0ced
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
+import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
+import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
+import com.google.cloud.pubsublite.internal.CursorClient;
+import com.google.cloud.pubsublite.internal.CursorClientSettings;
+import com.google.cloud.pubsublite.internal.wire.Committer;
+import com.google.cloud.pubsublite.internal.wire.CommitterSettings;
+import com.google.cloud.pubsublite.internal.wire.PubsubContext;
+import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
+import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
+import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
+import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
+import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
+import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
+import java.io.Serializable;
+import java.util.Set;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@AutoValue
+public abstract class SubscriberOptions implements Serializable {
+ private static final long serialVersionUID = 269598118L;
+
+ private static final Framework FRAMEWORK = Framework.of("BEAM");
+
+ private static final long MEBIBYTE = 1L << 20;
+
+ public static final FlowControlSettings DEFAULT_FLOW_CONTROL =
+ FlowControlSettings.builder()
+ .setMessagesOutstanding(Long.MAX_VALUE)
+ .setBytesOutstanding(100 * MEBIBYTE)
+ .build();
+
+ // Required parameters.
+ public abstract SubscriptionPath subscriptionPath();
+
+ // Optional parameters.
+ /** Per-partition flow control parameters for this subscription. */
+ public abstract FlowControlSettings flowControlSettings();
+
+ /**
+ * A set of partitions. If empty, continuously poll the set of partitions using an admin client.
+ */
+ public abstract Set partitions();
+
+ /**
+ * A factory to override subscriber creation entirely and delegate to another method. Primarily
+ * useful for testing.
+ */
+ abstract @Nullable SubscriberFactory subscriberFactory();
+
+ /**
+ * A supplier to override committer creation entirely and delegate to another method. Primarily
+ * useful for testing.
+ */
+ abstract @Nullable SerializableSupplier committerSupplier();
+
+ /**
+ * A supplier to override topic backlog reader creation entirely and delegate to another method.
+ * Primarily useful for testing.
+ */
+ abstract @Nullable SerializableSupplier backlogReaderSupplier();
+
+ /**
+ * A supplier to override offset reader creation entirely and delegate to another method.
+ * Primarily useful for testing.
+ */
+ abstract @Nullable SerializableSupplier offsetReaderSupplier();
+
+ public static Builder newBuilder() {
+ Builder builder = new AutoValue_SubscriberOptions.Builder();
+ return builder.setPartitions(ImmutableSet.of()).setFlowControlSettings(DEFAULT_FLOW_CONTROL);
+ }
+
+ public abstract Builder toBuilder();
+
+ private SubscriberServiceClient newSubscriberServiceClient(Partition partition)
+ throws ApiException {
+ try {
+ SubscriberServiceSettings.Builder settingsBuilder = SubscriberServiceSettings.newBuilder();
+ addDefaultMetadata(
+ PubsubContext.of(FRAMEWORK),
+ RoutingMetadata.of(subscriptionPath(), partition),
+ settingsBuilder);
+ return SubscriberServiceClient.create(
+ addDefaultSettings(subscriptionPath().location().region(), settingsBuilder));
+ } catch (Throwable t) {
+ throw toCanonical(t).underlying;
+ }
+ }
+
+ SubscriberFactory getSubscriberFactory(Partition partition) {
+ SubscriberFactory factory = subscriberFactory();
+ if (factory != null) {
+ return factory;
+ }
+ return consumer ->
+ SubscriberBuilder.newBuilder()
+ .setMessageConsumer(consumer)
+ .setSubscriptionPath(subscriptionPath())
+ .setPartition(partition)
+ .setServiceClient(newSubscriberServiceClient(partition))
+ .build();
+ }
+
+ Committer getCommitter(Partition partition) {
+ SerializableSupplier supplier = committerSupplier();
+ if (supplier != null) {
+ return supplier.get();
+ }
+ return CommitterSettings.newBuilder()
+ .setSubscriptionPath(subscriptionPath())
+ .setPartition(partition)
+ .build()
+ .instantiate();
+ }
+
+ TopicBacklogReader getBacklogReader(Partition partition) {
+ SerializableSupplier supplier = backlogReaderSupplier();
+ if (supplier != null) {
+ return supplier.get();
+ }
+ return TopicBacklogReaderSettings.newBuilder()
+ .setTopicPathFromSubscriptionPath(subscriptionPath())
+ .setPartition(partition)
+ .build()
+ .instantiate();
+ }
+
+ InitialOffsetReader getInitialOffsetReader(Partition partition) {
+ SerializableSupplier supplier = offsetReaderSupplier();
+ if (supplier != null) {
+ return supplier.get();
+ }
+ return new InitialOffsetReaderImpl(
+ CursorClient.create(
+ CursorClientSettings.newBuilder()
+ .setRegion(subscriptionPath().location().region())
+ .build()),
+ subscriptionPath(),
+ partition);
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ // Required parameters.
+ public abstract Builder setSubscriptionPath(SubscriptionPath path);
+
+ // Optional parameters
+ public abstract Builder setPartitions(Set partitions);
+
+ public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);
+
+ // Used in unit tests
+ abstract Builder setSubscriberFactory(SubscriberFactory subscriberFactory);
+
+ abstract Builder setCommitterSupplier(SerializableSupplier committerSupplier);
+
+ abstract Builder setBacklogReaderSupplier(
+ SerializableSupplier backlogReaderSupplier);
+
+ abstract Builder setOffsetReaderSupplier(
+ SerializableSupplier offsetReaderSupplier);
+
+ public abstract SubscriberOptions build();
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartition.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartition.java
new file mode 100644
index 000000000..d8faa827d
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartition.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import org.apache.beam.sdk.coders.DefaultCoder;
+
+@AutoValue
+@DefaultCoder(SubscriptionPartitionCoder.class)
+public abstract class SubscriptionPartition {
+ static SubscriptionPartition of(SubscriptionPath subscription, Partition partition) {
+ return new AutoValue_SubscriptionPartition(subscription, partition);
+ }
+
+ abstract SubscriptionPath subscription();
+
+ abstract Partition partition();
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionCoder.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionCoder.java
new file mode 100644
index 000000000..1e69959c4
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionCoder.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.coders.DelegateCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class SubscriptionPartitionCoder extends AtomicCoder {
+ private static final Coder CODER =
+ DelegateCoder.of(
+ KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()),
+ SubscriptionPartitionCoder::toKv,
+ SubscriptionPartitionCoder::fromKv);
+
+ private static KV toKv(SubscriptionPartition value) {
+ return KV.of(value.subscription().toString(), value.partition().value());
+ }
+
+ private static SubscriptionPartition fromKv(KV kv) {
+ return SubscriptionPartition.of(
+ SubscriptionPath.parse(kv.getKey()), Partition.of(kv.getValue()));
+ }
+
+ @Override
+ public void encode(SubscriptionPartition value, OutputStream outStream) throws IOException {
+ CODER.encode(value, outStream);
+ }
+
+ @Override
+ public SubscriptionPartition decode(InputStream inStream) throws IOException {
+ return CODER.decode(inStream);
+ }
+
+ public static CoderProvider getCoderProvider() {
+ return CoderProviders.forCoder(
+ TypeDescriptor.of(SubscriptionPartition.class), new SubscriptionPartitionCoder());
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionLoader.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionLoader.java
new file mode 100644
index 000000000..4bcab599c
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionLoader.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.PartitionLookupUtils;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollResult;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+class SubscriptionPartitionLoader extends PTransform> {
+ private final TopicPath topic;
+ private final SubscriptionPath subscription;
+ private final SerializableFunction getPartitionCount;
+ private final Duration pollDuration;
+ private final boolean terminate;
+
+ SubscriptionPartitionLoader(TopicPath topic, SubscriptionPath subscription) {
+ this(
+ topic,
+ subscription,
+ PartitionLookupUtils::numPartitions,
+ Duration.standardMinutes(1),
+ false);
+ }
+
+ @VisibleForTesting
+ SubscriptionPartitionLoader(
+ TopicPath topic,
+ SubscriptionPath subscription,
+ SerializableFunction getPartitionCount,
+ Duration pollDuration,
+ boolean terminate) {
+ this.topic = topic;
+ this.subscription = subscription;
+ this.getPartitionCount = getPartitionCount;
+ this.pollDuration = pollDuration;
+ this.terminate = terminate;
+ }
+
+ @Override
+ public PCollection expand(PBegin input) {
+ PCollection start = input.apply(Create.of(ImmutableList.of(topic)));
+ PCollection> partitions =
+ start.apply(
+ Watch.growthOf(
+ new PollFn() {
+ @Override
+ public PollResult apply(TopicPath element, Context c) {
+ checkArgument(element.equals(topic));
+ int partitionCount = getPartitionCount.apply(element);
+ List partitions =
+ IntStream.range(0, partitionCount)
+ .mapToObj(Partition::of)
+ .collect(Collectors.toList());
+ return PollResult.incomplete(Instant.now(), partitions);
+ }
+ })
+ .withPollInterval(pollDuration)
+ .withTerminationPerInput(
+ terminate
+ ? Watch.Growth.afterTotalOf(pollDuration.multipliedBy(10))
+ : Watch.Growth.never()));
+ return partitions.apply(
+ MapElements.into(TypeDescriptor.of(SubscriptionPartition.class))
+ .via(kv -> SubscriptionPartition.of(subscription, kv.getValue())));
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessor.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessor.java
new file mode 100644
index 000000000..b6a7c6cda
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessor.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.internal.CheckedApiException;
+import java.util.Optional;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.joda.time.Duration;
+
+interface SubscriptionPartitionProcessor extends AutoCloseable {
+ void start() throws CheckedApiException;
+
+ ProcessContinuation waitForCompletion(Duration duration);
+
+ Optional lastClaimed();
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorFactory.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorFactory.java
new file mode 100644
index 000000000..07e70b013
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import java.io.Serializable;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+
+interface SubscriptionPartitionProcessorFactory extends Serializable {
+ long serialVersionUID = 765145146544654L;
+
+ SubscriptionPartitionProcessor newProcessor(
+ SubscriptionPartition subscriptionPartition,
+ RestrictionTracker tracker,
+ OutputReceiver receiver);
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImpl.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImpl.java
new file mode 100644
index 000000000..136fbca1a
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImpl.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.api.core.ApiService.Listener;
+import com.google.api.core.ApiService.State;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
+import com.google.cloud.pubsublite.internal.CheckedApiException;
+import com.google.cloud.pubsublite.internal.ExtractStatus;
+import com.google.cloud.pubsublite.internal.wire.Subscriber;
+import com.google.cloud.pubsublite.proto.Cursor;
+import com.google.cloud.pubsublite.proto.FlowControlRequest;
+import com.google.cloud.pubsublite.proto.SeekRequest;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import com.google.protobuf.util.Timestamps;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+class SubscriptionPartitionProcessorImpl extends Listener
+ implements SubscriptionPartitionProcessor {
+ private final RestrictionTracker tracker;
+ private final OutputReceiver receiver;
+ private final Subscriber subscriber;
+ private final SettableFuture completionFuture = SettableFuture.create();
+ private final FlowControlSettings flowControlSettings;
+ private Optional lastClaimedOffset = Optional.empty();
+
+ @SuppressWarnings("methodref.receiver.bound.invalid")
+ SubscriptionPartitionProcessorImpl(
+ RestrictionTracker tracker,
+ OutputReceiver receiver,
+ Function>, Subscriber> subscriberFactory,
+ FlowControlSettings flowControlSettings) {
+ this.tracker = tracker;
+ this.receiver = receiver;
+ this.subscriber = subscriberFactory.apply(this::onMessages);
+ this.flowControlSettings = flowControlSettings;
+ }
+
+ @Override
+ @SuppressWarnings("argument.type.incompatible")
+ public void start() throws CheckedApiException {
+ this.subscriber.addListener(this, MoreExecutors.directExecutor());
+ this.subscriber.startAsync();
+ this.subscriber.awaitRunning();
+ try {
+ this.subscriber
+ .seek(
+ SeekRequest.newBuilder()
+ .setCursor(Cursor.newBuilder().setOffset(tracker.currentRestriction().getFrom()))
+ .build())
+ .get();
+ this.subscriber.allowFlow(
+ FlowControlRequest.newBuilder()
+ .setAllowedBytes(flowControlSettings.bytesOutstanding())
+ .setAllowedMessages(flowControlSettings.messagesOutstanding())
+ .build());
+ } catch (ExecutionException e) {
+ throw ExtractStatus.toCanonical(e.getCause());
+ } catch (Throwable t) {
+ throw ExtractStatus.toCanonical(t);
+ }
+ }
+
+ private void onMessages(List messages) {
+ if (completionFuture.isDone()) {
+ return;
+ }
+ Offset lastOffset = Offset.of(Iterables.getLast(messages).getCursor().getOffset());
+ long byteSize = messages.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
+ if (tracker.tryClaim(OffsetByteProgress.of(lastOffset, byteSize))) {
+ lastClaimedOffset = Optional.of(lastOffset);
+ messages.forEach(
+ message ->
+ receiver.outputWithTimestamp(
+ message, new Instant(Timestamps.toMillis(message.getPublishTime()))));
+ try {
+ subscriber.allowFlow(
+ FlowControlRequest.newBuilder()
+ .setAllowedBytes(byteSize)
+ .setAllowedMessages(messages.size())
+ .build());
+ } catch (CheckedApiException e) {
+ completionFuture.setException(e);
+ }
+ } else {
+ completionFuture.set(null);
+ }
+ }
+
+ @Override
+ public void failed(State from, Throwable failure) {
+ completionFuture.setException(ExtractStatus.toCanonical(failure));
+ }
+
+ @Override
+ public void close() {
+ subscriber.stopAsync().awaitTerminated();
+ }
+
+ @Override
+ @SuppressWarnings("argument.type.incompatible")
+ public ProcessContinuation waitForCompletion(Duration duration) {
+ try {
+ completionFuture.get(duration.getMillis(), TimeUnit.MILLISECONDS);
+ // CompletionFuture set with null when tryClaim returned false.
+ return ProcessContinuation.stop();
+ } catch (TimeoutException ignored) {
+ // Timed out waiting, yield to the runtime.
+ return ProcessContinuation.resume();
+ } catch (ExecutionException e) {
+ throw ExtractStatus.toCanonical(e.getCause()).underlying;
+ } catch (Throwable t) {
+ throw ExtractStatus.toCanonical(t).underlying;
+ }
+ }
+
+ @Override
+ public Optional lastClaimed() {
+ return lastClaimedOffset;
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReader.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReader.java
new file mode 100644
index 000000000..419cf709b
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReader.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+
+/**
+ * The TopicBacklogReader uses the TopicStats API to aggregate the backlog, or the distance between
+ * the current cursor and HEAD for a single {subscription, partition} pair.
+ */
+interface TopicBacklogReader extends AutoCloseable {
+ /**
+ * Compute and aggregate message statistics for message between the provided start offset and
+ * HEAD. This method is blocking.
+ *
+ * @param offset The current offset of the subscriber.
+ * @return A ComputeMessageStatsResponse with the aggregated statistics for messages in the
+ * backlog.
+ */
+ ComputeMessageStatsResponse computeMessageStats(Offset offset) throws ApiException;
+
+ @Override
+ void close();
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderImpl.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderImpl.java
new file mode 100644
index 000000000..a3e0c0e28
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderImpl.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.internal.TopicStatsClient;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nonnull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class TopicBacklogReaderImpl implements TopicBacklogReader {
+ private static final Logger LOG = LoggerFactory.getLogger(TopicBacklogReaderImpl.class);
+ private final TopicStatsClient client;
+ private final TopicPath topicPath;
+ private final Partition partition;
+
+ public TopicBacklogReaderImpl(TopicStatsClient client, TopicPath topicPath, Partition partition) {
+ this.client = client;
+ this.topicPath = topicPath;
+ this.partition = partition;
+ }
+
+ @Override
+ @SuppressWarnings("assignment.type.incompatible")
+ public ComputeMessageStatsResponse computeMessageStats(Offset offset) throws ApiException {
+ try {
+ return client
+ .computeMessageStats(topicPath, partition, offset, Offset.of(Integer.MAX_VALUE))
+ .get();
+ } catch (ExecutionException e) {
+ @Nonnull Throwable cause = checkNotNull(e.getCause());
+ throw toCanonical(cause).underlying;
+ } catch (InterruptedException e) {
+ throw toCanonical(e).underlying;
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close topic stats client.", e);
+ }
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java
new file mode 100644
index 000000000..114cc19d7
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.pubsublite.AdminClient;
+import com.google.cloud.pubsublite.AdminClientSettings;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.internal.ExtractStatus;
+import com.google.cloud.pubsublite.internal.TopicStatsClient;
+import com.google.cloud.pubsublite.internal.TopicStatsClientSettings;
+import java.io.Serializable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nonnull;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
+
+@AutoValue
+abstract class TopicBacklogReaderSettings implements Serializable {
+ private static final long serialVersionUID = -4001752066450248673L;
+
+ /**
+ * The topic path for this backlog reader. Either topicPath or subscriptionPath must be set. If
+ * both are set, subscriptionPath will be ignored.
+ */
+ abstract TopicPath topicPath();
+
+ abstract Partition partition();
+
+ static Builder newBuilder() {
+ return new AutoValue_TopicBacklogReaderSettings.Builder();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+
+ // Required parameters.
+ abstract Builder setTopicPath(TopicPath topicPath);
+
+ @SuppressWarnings("assignment.type.incompatible")
+ Builder setTopicPathFromSubscriptionPath(SubscriptionPath subscriptionPath)
+ throws ApiException {
+ try (AdminClient adminClient =
+ AdminClient.create(
+ AdminClientSettings.newBuilder()
+ .setRegion(subscriptionPath.location().region())
+ .build())) {
+ return setTopicPath(
+ TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic()));
+ } catch (ExecutionException e) {
+ @Nonnull Throwable cause = checkNotNull(e.getCause());
+ throw ExtractStatus.toCanonical(cause).underlying;
+ } catch (Throwable t) {
+ throw ExtractStatus.toCanonical(t).underlying;
+ }
+ }
+
+ abstract Builder setPartition(Partition partition);
+
+ abstract TopicBacklogReaderSettings build();
+ }
+
+ TopicBacklogReader instantiate() throws ApiException {
+ TopicStatsClientSettings settings =
+ TopicStatsClientSettings.newBuilder().setRegion(topicPath().location().region()).build();
+ TopicBacklogReader impl =
+ new TopicBacklogReaderImpl(TopicStatsClient.create(settings), topicPath(), partition());
+ return new LimitingTopicBacklogReader(impl, Ticker.systemTicker());
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Uuid.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Uuid.java
new file mode 100644
index 000000000..b3ff52f54
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Uuid.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.auto.value.AutoValue;
+import com.google.protobuf.ByteString;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.UUID;
+import org.apache.beam.sdk.coders.DefaultCoder;
+
+/** A Uuid storable in a Pub/Sub Lite attribute. */
+@DefaultCoder(UuidCoder.class)
+@AutoValue
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public abstract class Uuid {
+ public static final String DEFAULT_ATTRIBUTE = "x-goog-pubsublite-dataflow-uuid";
+
+ public abstract ByteString value();
+
+ public static Uuid of(ByteString value) {
+ return new AutoValue_Uuid(value);
+ }
+
+ public static Uuid random() {
+ UUID uuid = UUID.randomUUID();
+ ByteString.Output output = ByteString.newOutput(16);
+ DataOutputStream stream = new DataOutputStream(output);
+ try {
+ stream.writeLong(uuid.getMostSignificantBits());
+ stream.writeLong(uuid.getLeastSignificantBits());
+ } catch (IOException e) {
+ throw new RuntimeException("Should never have an IOException since there is no io.", e);
+ }
+ return Uuid.of(output.toByteString());
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/UuidCoder.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/UuidCoder.java
new file mode 100644
index 000000000..b6305a571
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/UuidCoder.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.coders.DelegateCoder;
+import org.apache.beam.sdk.extensions.protobuf.ByteStringCoder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** A coder for a Uuid. */
+public class UuidCoder extends AtomicCoder {
+ private static final Coder CODER =
+ DelegateCoder.of(ByteStringCoder.of(), Uuid::value, Uuid::of);
+
+ @Override
+ public void encode(Uuid value, OutputStream outStream) throws IOException {
+ CODER.encode(value, outStream);
+ }
+
+ @Override
+ public Uuid decode(InputStream inStream) throws IOException {
+ return CODER.decode(inStream);
+ }
+
+ public static CoderProvider getCoderProvider() {
+ return CoderProviders.forCoder(TypeDescriptor.of(Uuid.class), new UuidCoder());
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/UuidDeduplicationOptions.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/UuidDeduplicationOptions.java
new file mode 100644
index 000000000..bcc1c6be8
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/UuidDeduplicationOptions.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import com.google.protobuf.ByteString;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.Deduplicate;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.joda.time.Duration;
+
+/** Options for deduplicating Pub/Sub Lite messages based on the UUID they were published with. */
+@AutoValue
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public abstract class UuidDeduplicationOptions implements Serializable {
+ private static final long serialVersionUID = 9837489720893L;
+
+ public static final SerializableFunction DEFAULT_UUID_EXTRACTOR =
+ message -> {
+ checkArgument(
+ message.getMessage().getAttributesMap().containsKey(Uuid.DEFAULT_ATTRIBUTE),
+ "Uuid attribute missing.");
+ List attributes =
+ message.getMessage().getAttributesMap().get(Uuid.DEFAULT_ATTRIBUTE).getValuesList();
+ checkArgument(attributes.size() == 1, "Duplicate Uuid attribute values exist.");
+ return Uuid.of(attributes.get(0));
+ };
+
+ public static final int DEFAULT_HASH_PARTITIONS = 10000;
+
+ public static final TimeDomain DEFAULT_TIME_DOMAIN = TimeDomain.EVENT_TIME;
+ public static final Duration DEFAULT_DEDUPLICATE_DURATION = Duration.standardDays(1);
+
+ // All parameters are optional.
+ public abstract SerializableFunction uuidExtractor();
+
+ public abstract Deduplicate.KeyedValues deduplicate();
+
+ @SuppressWarnings("CheckReturnValue")
+ public static Builder newBuilder() {
+ Builder builder = new AutoValue_UuidDeduplicationOptions.Builder();
+ builder.setUuidExtractor(DEFAULT_UUID_EXTRACTOR);
+ builder.setDeduplicate(
+ Deduplicate.keyedValues()
+ .withTimeDomain(DEFAULT_TIME_DOMAIN)
+ .withDuration(DEFAULT_DEDUPLICATE_DURATION));
+ return builder;
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setUuidExtractor(
+ SerializableFunction uuidExtractor);
+
+ /**
+ * Set the deduplication transform.
+ *
+ * {@code
+ * UuidDeduplicationOptions.Builder builder = UuidDeduplicationOptions.newBuilder();
+ * builder.setDeduplicate(Deduplicate.keyedValues()
+ * .withTimeDomain(TimeDomain.PROCESSING_TIME));
+ * }
+ */
+ public abstract Builder setDeduplicate(
+ Deduplicate.KeyedValues deduplicate);
+
+ public abstract UuidDeduplicationOptions build();
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/UuidDeduplicationTransform.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/UuidDeduplicationTransform.java
new file mode 100644
index 000000000..54ab46d59
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/UuidDeduplicationTransform.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ProcessFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A transform for deduplicating Pub/Sub Lite messages based on the UUID they were published with.
+ */
+class UuidDeduplicationTransform
+ extends PTransform, PCollection> {
+ private final UuidDeduplicationOptions options;
+
+ UuidDeduplicationTransform(UuidDeduplicationOptions options) {
+ this.options = options;
+ }
+
+ @Override
+ public PCollection expand(PCollection input) {
+ ProcessFunction> mapWithKeys =
+ message -> KV.of(options.uuidExtractor().apply(message), message);
+ PCollection> uuidMapped =
+ input.apply(
+ "MapUuids",
+ MapElements.into(new TypeDescriptor>() {}).via(mapWithKeys));
+ PCollection> unique =
+ uuidMapped.apply("Deduplicate", options.deduplicate());
+ return unique.apply("StripUuids", Values.create());
+ }
+}
diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/package-info.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/package-info.java
new file mode 100644
index 000000000..479275ab3
--- /dev/null
+++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Defines transforms for reading and writing from Google Cloud Pub/Sub Lite. */
+@CheckReturnValue
+@ParametersAreNonnullByDefault
+package com.google.cloud.pubsublite.beam;
+
+import com.google.errorprone.annotations.CheckReturnValue;
+import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/AddUuidsTransformTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/AddUuidsTransformTest.java
new file mode 100644
index 000000000..f52ac4c40
--- /dev/null
+++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/AddUuidsTransformTest.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.cloud.pubsublite.proto.PubSubMessage;
+import com.google.protobuf.ByteString;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public final class AddUuidsTransformTest {
+ @Rule public final TestPipeline pipeline = TestPipeline.create();
+
+ private static PubSubMessage newMessage(int identifier) {
+ return PubSubMessage.newBuilder()
+ .setKey(ByteString.copyFromUtf8(Integer.toString(identifier)))
+ .build();
+ }
+
+ private static SerializableFunction, Void> identifiersInAnyOrder(
+ Set identifiers) {
+ return messages -> {
+ Set uuids = new HashSet<>();
+ messages.forEach(
+ message -> {
+ int identifier = Integer.parseInt(message.getKey().toStringUtf8());
+ if (!identifiers.remove(identifier)) {
+ throw new IllegalStateException("Duplicate element " + identifier);
+ }
+ if (!uuids.add(
+ Uuid.of(
+ Iterables.getOnlyElement(
+ message.getAttributesMap().get(Uuid.DEFAULT_ATTRIBUTE).getValuesList())))) {
+ throw new IllegalStateException("Invalid duplicate Uuid: " + message.toString());
+ }
+ });
+ if (!identifiers.isEmpty()) {
+ throw new IllegalStateException("Elements not in collection: " + identifiers);
+ }
+ return null;
+ };
+ }
+
+ @Test
+ public void messagesSameBatch() {
+ TestStream messageStream =
+ TestStream.create(ProtoCoder.of(PubSubMessage.class))
+ .addElements(newMessage(1), newMessage(2), newMessage(85))
+ .advanceWatermarkToInfinity();
+ PCollection outputs =
+ pipeline.apply(messageStream).apply(new AddUuidsTransform());
+ PAssert.that(outputs).satisfies(identifiersInAnyOrder(Sets.newHashSet(1, 2, 85)));
+ pipeline.run();
+ }
+
+ @Test
+ public void messagesTimeDelayed() {
+ TestStream messageStream =
+ TestStream.create(ProtoCoder.of(PubSubMessage.class))
+ .addElements(newMessage(1), newMessage(2))
+ .advanceProcessingTime(Duration.standardDays(1))
+ .addElements(newMessage(85))
+ .advanceWatermarkToInfinity();
+ PCollection outputs =
+ pipeline.apply(messageStream).apply(new AddUuidsTransform());
+ PAssert.that(outputs).satisfies(identifiersInAnyOrder(Sets.newHashSet(1, 2, 85)));
+ pipeline.run();
+ }
+}
diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/FakeSerializable.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/FakeSerializable.java
new file mode 100644
index 000000000..7f44b0b7f
--- /dev/null
+++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/FakeSerializable.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import java.io.Serializable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A FakeSerializable hides a non-serializable object in a static map and returns a handle into the
+ * static map. It is useful in the presence of in-process serialization, but not out of process
+ * serialization.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+final class FakeSerializable {
+ private static final AtomicInteger idCounter = new AtomicInteger(0);
+ private static final ConcurrentHashMap map = new ConcurrentHashMap<>();
+
+ private FakeSerializable() {}
+
+ static class Handle implements Serializable {
+ private Handle(int id) {
+ this.id = id;
+ }
+
+ private final int id;
+
+ @SuppressWarnings("unchecked")
+ T get() {
+ return (T) map.get(id);
+ }
+ }
+
+ static Handle put(T value) {
+ int id = idCounter.incrementAndGet();
+ map.put(id, value);
+ return new Handle(id);
+ }
+
+ static SerializableSupplier getSupplier(T value) {
+ Handle handle = put(value);
+ return handle::get;
+ }
+}
diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java
new file mode 100644
index 000000000..47e5ae54c
--- /dev/null
+++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode.Code;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.internal.CheckedApiException;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Spy;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings("initialization.fields.uninitialized")
+public class OffsetByteRangeTrackerTest {
+ private static final double IGNORED_FRACTION = -10000000.0;
+ private static final long MIN_BYTES = 1000;
+ private static final OffsetRange RANGE = new OffsetRange(123L, Long.MAX_VALUE);
+ private final TopicBacklogReader reader = mock(TopicBacklogReader.class);
+
+ @Spy Ticker ticker;
+ private OffsetByteRangeTracker tracker;
+
+ @Before
+ public void setUp() {
+ initMocks(this);
+ when(ticker.read()).thenReturn(0L);
+ tracker =
+ new OffsetByteRangeTracker(
+ RANGE, reader, Stopwatch.createUnstarted(ticker), Duration.millis(500), MIN_BYTES);
+ }
+
+ @Test
+ public void progressTracked() {
+ assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(123), 10)));
+ assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(124), 11)));
+ when(reader.computeMessageStats(Offset.of(125)))
+ .thenReturn(ComputeMessageStatsResponse.newBuilder().setMessageBytes(1000).build());
+ Progress progress = tracker.getProgress();
+ assertEquals(21, progress.getWorkCompleted(), .0001);
+ assertEquals(1000, progress.getWorkRemaining(), .0001);
+ }
+
+ @Test
+ public void getProgressStatsFailure() {
+ when(reader.computeMessageStats(Offset.of(123)))
+ .thenThrow(new CheckedApiException(Code.INTERNAL).underlying);
+ assertThrows(ApiException.class, tracker::getProgress);
+ }
+
+ @Test
+ @SuppressWarnings({"dereference.of.nullable", "argument.type.incompatible"})
+ public void claimSplitSuccess() {
+ assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), MIN_BYTES)));
+ assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(10_000), MIN_BYTES)));
+ SplitResult splits = tracker.trySplit(IGNORED_FRACTION);
+ assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom());
+ assertEquals(10_001, splits.getPrimary().getTo());
+ assertEquals(10_001, splits.getResidual().getFrom());
+ assertEquals(Long.MAX_VALUE, splits.getResidual().getTo());
+ assertEquals(splits.getPrimary(), tracker.currentRestriction());
+ tracker.checkDone();
+ assertNull(tracker.trySplit(IGNORED_FRACTION));
+ }
+
+ @Test
+ @SuppressWarnings({"dereference.of.nullable", "argument.type.incompatible"})
+ public void splitWithoutClaimEmpty() {
+ when(ticker.read()).thenReturn(100000000000000L);
+ SplitResult splits = tracker.trySplit(IGNORED_FRACTION);
+ assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom());
+ assertEquals(RANGE.getFrom(), splits.getPrimary().getTo());
+ assertEquals(RANGE, splits.getResidual());
+ assertEquals(splits.getPrimary(), tracker.currentRestriction());
+ tracker.checkDone();
+ assertNull(tracker.trySplit(IGNORED_FRACTION));
+ }
+
+ @Test
+ public void unboundedNotDone() {
+ assertThrows(IllegalStateException.class, tracker::checkDone);
+ }
+
+ @Test
+ public void cannotClaimBackwards() {
+ assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), MIN_BYTES)));
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), MIN_BYTES)));
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> tracker.tryClaim(OffsetByteProgress.of(Offset.of(999), MIN_BYTES)));
+ }
+
+ @Test
+ public void cannotClaimSplitRange() {
+ assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), MIN_BYTES)));
+ assertTrue(tracker.trySplit(IGNORED_FRACTION) != null);
+ assertFalse(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_001), MIN_BYTES)));
+ }
+
+ @Test
+ public void cannotSplitNotEnoughBytesOrTime() {
+ assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), MIN_BYTES - 2)));
+ assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_001), 1)));
+ when(ticker.read()).thenReturn(100_000_000L);
+ assertTrue(tracker.trySplit(IGNORED_FRACTION) == null);
+ }
+
+ @Test
+ public void canSplitTimeOnly() {
+ assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), MIN_BYTES - 2)));
+ assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_001), 1)));
+ when(ticker.read()).thenReturn(1_000_000_000L);
+ assertTrue(tracker.trySplit(IGNORED_FRACTION) != null);
+ }
+
+ @Test
+ public void canSplitBytesOnly() {
+ assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), MIN_BYTES - 2)));
+ assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_001), 2)));
+ when(ticker.read()).thenReturn(100_000_000L);
+ assertTrue(tracker.trySplit(IGNORED_FRACTION) != null);
+ }
+}
diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java
new file mode 100644
index 000000000..5516e07aa
--- /dev/null
+++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode.Code;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.internal.CheckedApiException;
+import com.google.cloud.pubsublite.internal.testing.FakeApiService;
+import com.google.cloud.pubsublite.internal.wire.Committer;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.stubbing.Answer;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings("initialization.fields.uninitialized")
+public class PerSubscriptionPartitionSdfTest {
+ private static final Duration MAX_SLEEP_TIME =
+ Duration.standardMinutes(10).plus(Duration.millis(10));
+ private static final OffsetRange RESTRICTION = new OffsetRange(1, Long.MAX_VALUE);
+ private static final SubscriptionPartition PARTITION =
+ SubscriptionPartition.of(example(SubscriptionPath.class), example(Partition.class));
+
+ @Mock SerializableFunction offsetReaderFactory;
+
+ @Mock
+ SerializableBiFunction<
+ SubscriptionPartition, OffsetRange, RestrictionTracker>
+ trackerFactory;
+
+ @Mock SubscriptionPartitionProcessorFactory processorFactory;
+ @Mock SerializableFunction committerFactory;
+
+ @Mock InitialOffsetReader initialOffsetReader;
+ @Spy RestrictionTracker tracker;
+ @Mock OutputReceiver output;
+ @Mock BundleFinalizer finalizer;
+ @Mock SubscriptionPartitionProcessor processor;
+
+ abstract static class FakeCommitter extends FakeApiService implements Committer {}
+
+ @Spy FakeCommitter committer;
+
+ PerSubscriptionPartitionSdf sdf;
+
+ @Before
+ public void setUp() {
+ initMocks(this);
+ when(offsetReaderFactory.apply(any())).thenReturn(initialOffsetReader);
+ when(processorFactory.newProcessor(any(), any(), any())).thenReturn(processor);
+ when(trackerFactory.apply(any(), any())).thenReturn(tracker);
+ when(committerFactory.apply(any())).thenReturn(committer);
+ when(tracker.currentRestriction()).thenReturn(RESTRICTION);
+ sdf =
+ new PerSubscriptionPartitionSdf(
+ MAX_SLEEP_TIME,
+ offsetReaderFactory,
+ trackerFactory,
+ processorFactory,
+ committerFactory);
+ }
+
+ @Test
+ public void getInitialRestrictionReadSuccess() {
+ when(initialOffsetReader.read()).thenReturn(example(Offset.class));
+ OffsetRange range = sdf.getInitialRestriction(PARTITION);
+ assertEquals(example(Offset.class).value(), range.getFrom());
+ assertEquals(Long.MAX_VALUE, range.getTo());
+ verify(offsetReaderFactory).apply(PARTITION);
+ }
+
+ @Test
+ public void getInitialRestrictionReadFailure() {
+ when(initialOffsetReader.read()).thenThrow(new CheckedApiException(Code.INTERNAL).underlying);
+ assertThrows(ApiException.class, () -> sdf.getInitialRestriction(PARTITION));
+ }
+
+ @Test
+ public void newTrackerCallsFactory() {
+ assertSame(tracker, sdf.newTracker(PARTITION, RESTRICTION));
+ verify(trackerFactory).apply(PARTITION, RESTRICTION);
+ }
+
+ @Test
+ @SuppressWarnings("argument.type.incompatible")
+ public void process() throws Exception {
+ when(processor.waitForCompletion(MAX_SLEEP_TIME)).thenReturn(ProcessContinuation.resume());
+ when(processorFactory.newProcessor(any(), any(), any()))
+ .thenAnswer(
+ args -> {
+ @Nonnull
+ RestrictionTracker wrapped = args.getArgument(1);
+ when(tracker.tryClaim(any())).thenReturn(true).thenReturn(false);
+ assertTrue(wrapped.tryClaim(OffsetByteProgress.of(example(Offset.class), 123)));
+ assertFalse(wrapped.tryClaim(OffsetByteProgress.of(Offset.of(333333), 123)));
+ return processor;
+ });
+ AtomicReference callbackRef = new AtomicReference<>(null);
+ doAnswer(
+ (Answer)
+ args -> {
+ callbackRef.set(args.getArgument(1));
+ return null;
+ })
+ .when(finalizer)
+ .afterBundleCommit(any(), any());
+ doReturn(Optional.of(example(Offset.class))).when(processor).lastClaimed();
+ assertEquals(
+ ProcessContinuation.resume(), sdf.processElement(tracker, PARTITION, output, finalizer));
+ verify(processorFactory).newProcessor(eq(PARTITION), any(), eq(output));
+ InOrder order = inOrder(processor);
+ order.verify(processor).start();
+ order.verify(processor).waitForCompletion(MAX_SLEEP_TIME);
+ order.verify(processor).lastClaimed();
+ order.verify(processor).close();
+ verify(finalizer).afterBundleCommit(eq(Instant.ofEpochMilli(Long.MAX_VALUE)), any());
+ assertTrue(callbackRef.get() != null);
+ when(committer.commitOffset(any())).thenReturn(ApiFutures.immediateFuture(null));
+ callbackRef.get().onBundleSuccess();
+ InOrder order2 = inOrder(committerFactory, committer);
+ order2.verify(committer).startAsync();
+ order2.verify(committer).awaitRunning();
+ order2.verify(committer).commitOffset(Offset.of(example(Offset.class).value() + 1));
+ order2.verify(committer).stopAsync();
+ order2.verify(committer).awaitTerminated();
+ }
+
+ @Test
+ @SuppressWarnings("return.type.incompatible")
+ public void dofnIsSerializable() throws Exception {
+ ObjectOutputStream output = new ObjectOutputStream(new ByteArrayOutputStream());
+ output.writeObject(
+ new PerSubscriptionPartitionSdf(
+ MAX_SLEEP_TIME, x -> null, (x, y) -> null, (x, y, z) -> null, (x) -> null));
+ }
+}
diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PubsubLiteSinkTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PubsubLiteSinkTest.java
new file mode 100644
index 000000000..4b80ecfbd
--- /dev/null
+++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PubsubLiteSinkTest.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.core.ApiFutures;
+import com.google.api.core.ApiService;
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.rpc.StatusCode.Code;
+import com.google.cloud.pubsublite.CloudRegion;
+import com.google.cloud.pubsublite.CloudZone;
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.MessageMetadata;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.ProjectNumber;
+import com.google.cloud.pubsublite.TopicName;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.internal.CheckedApiException;
+import com.google.cloud.pubsublite.internal.ExtractStatus;
+import com.google.cloud.pubsublite.internal.Publisher;
+import com.google.cloud.pubsublite.internal.testing.FakeApiService;
+import com.google.protobuf.ByteString;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+import org.mockito.stubbing.Answer;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class PubsubLiteSinkTest {
+ @Rule public final TestPipeline pipeline = TestPipeline.create();
+
+ abstract static class PublisherFakeService extends FakeApiService
+ implements Publisher {}
+
+ @Spy private PublisherFakeService publisher;
+
+ private PublisherOptions defaultOptions() {
+ return PublisherOptions.newBuilder()
+ .setTopicPath(
+ TopicPath.newBuilder()
+ .setProject(ProjectNumber.of(9))
+ .setName(TopicName.of("abc"))
+ .setLocation(CloudZone.of(CloudRegion.of("us-east1"), 'a'))
+ .build())
+ .build();
+ }
+
+ private final PubsubLiteSink sink = new PubsubLiteSink(defaultOptions());
+
+ // Initialized in setUp.
+ private ApiService.Listener listener;
+
+ @Captor
+ final ArgumentCaptor publishedMessageCaptor = ArgumentCaptor.forClass(Message.class);
+
+ private void runWith(Message... messages) {
+ pipeline
+ .apply(
+ Create.of(Arrays.stream(messages).map(Message::toProto).collect(Collectors.toList())))
+ .apply(ParDo.of(sink));
+ pipeline.run();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ PerServerPublisherCache.PUBLISHER_CACHE.set(defaultOptions(), publisher);
+ doAnswer(
+ (Answer)
+ args -> {
+ listener = args.getArgument(0);
+ return null;
+ })
+ .when(publisher)
+ .addListener(any(), any());
+ sink.setup();
+ verify(publisher).addListener(any(), any());
+ }
+
+ @Test
+ public void singleMessagePublishes() throws Exception {
+ when(publisher.publish(Message.builder().build()))
+ .thenReturn(ApiFutures.immediateFuture(MessageMetadata.of(Partition.of(1), Offset.of(2))));
+ runWith(Message.builder().build());
+ verify(publisher).publish(Message.builder().build());
+ }
+
+ @Test
+ public void manyMessagePublishes() throws Exception {
+ Message message1 = Message.builder().build();
+ Message message2 = Message.builder().setKey(ByteString.copyFromUtf8("abc")).build();
+ when(publisher.publish(message1))
+ .thenReturn(ApiFutures.immediateFuture(MessageMetadata.of(Partition.of(1), Offset.of(2))));
+ when(publisher.publish(message2))
+ .thenReturn(ApiFutures.immediateFuture(MessageMetadata.of(Partition.of(85), Offset.of(3))));
+ runWith(message1, message2);
+ verify(publisher, times(2)).publish(publishedMessageCaptor.capture());
+ assertThat(publishedMessageCaptor.getAllValues(), containsInAnyOrder(message1, message2));
+ }
+
+ @Test
+ public void singleExceptionWhenProcessing() {
+ Message message1 = Message.builder().build();
+ when(publisher.publish(message1))
+ .thenReturn(
+ ApiFutures.immediateFailedFuture(new CheckedApiException(Code.INTERNAL).underlying));
+ PipelineExecutionException e =
+ assertThrows(PipelineExecutionException.class, () -> runWith(message1));
+ verify(publisher).publish(message1);
+ Optional statusOr = ExtractStatus.extract(e.getCause());
+ assertTrue(statusOr.isPresent());
+ assertThat(statusOr.get().code(), equalTo(Code.INTERNAL));
+ }
+
+ @Test
+ public void exceptionMixedWithOK() throws Exception {
+ Message message1 = Message.builder().build();
+ Message message2 = Message.builder().setKey(ByteString.copyFromUtf8("abc")).build();
+ Message message3 = Message.builder().setKey(ByteString.copyFromUtf8("def")).build();
+ SettableApiFuture future1 = SettableApiFuture.create();
+ SettableApiFuture future2 = SettableApiFuture.create();
+ SettableApiFuture future3 = SettableApiFuture.create();
+ CountDownLatch startedLatch = new CountDownLatch(3);
+ when(publisher.publish(message1))
+ .then(
+ invocation -> {
+ startedLatch.countDown();
+ return future1;
+ });
+ when(publisher.publish(message2))
+ .then(
+ invocation -> {
+ startedLatch.countDown();
+ return future2;
+ });
+ when(publisher.publish(message3))
+ .then(
+ invocation -> {
+ startedLatch.countDown();
+ return future3;
+ });
+ ExecutorService exec = Executors.newCachedThreadPool();
+ exec.execute(
+ () -> {
+ try {
+ startedLatch.await();
+ future1.set(MessageMetadata.of(Partition.of(1), Offset.of(2)));
+ future2.setException(new CheckedApiException(Code.INTERNAL).underlying);
+ future3.set(MessageMetadata.of(Partition.of(1), Offset.of(3)));
+ } catch (InterruptedException e) {
+ fail();
+ throw new RuntimeException(e);
+ }
+ });
+ PipelineExecutionException e =
+ assertThrows(PipelineExecutionException.class, () -> runWith(message1, message2, message3));
+ verify(publisher, times(3)).publish(publishedMessageCaptor.capture());
+ assertThat(
+ publishedMessageCaptor.getAllValues(), containsInAnyOrder(message1, message2, message3));
+ Optional statusOr = ExtractStatus.extract(e.getCause());
+ assertTrue(statusOr.isPresent());
+ assertThat(statusOr.get().code(), equalTo(Code.INTERNAL));
+ exec.shutdownNow();
+ }
+
+ @Test
+ public void listenerExceptionOnBundleFinish() throws Exception {
+ Message message1 = Message.builder().build();
+ SettableApiFuture future = SettableApiFuture.create();
+
+ SettableApiFuture publishFuture = SettableApiFuture.create();
+ when(publisher.publish(message1))
+ .thenAnswer(
+ args -> {
+ publishFuture.set(null);
+ return future;
+ });
+ Future> executorFuture =
+ Executors.newSingleThreadExecutor()
+ .submit(
+ () -> {
+ PipelineExecutionException e =
+ assertThrows(PipelineExecutionException.class, () -> runWith(message1));
+ Optional statusOr = ExtractStatus.extract(e.getCause());
+ assertTrue(statusOr.isPresent());
+ assertThat(statusOr.get().code(), equalTo(Code.INTERNAL));
+ });
+ publishFuture.get();
+ listener.failed(null, new CheckedApiException(Code.INTERNAL).underlying);
+ future.set(MessageMetadata.of(Partition.of(1), Offset.of(2)));
+ executorFuture.get();
+ }
+}
diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionLoaderTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionLoaderTest.java
new file mode 100644
index 000000000..436465f22
--- /dev/null
+++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionLoaderTest.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+
+@SuppressWarnings("initialization.fields.uninitialized")
+@RunWith(JUnit4.class)
+public class SubscriptionPartitionLoaderTest {
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+ @Mock SerializableFunction getPartitionCount;
+ private SubscriptionPartitionLoader loader;
+
+ @Before
+ public void setUp() {
+ initMocks(this);
+ FakeSerializable.Handle> handle =
+ FakeSerializable.put(getPartitionCount);
+ loader =
+ new SubscriptionPartitionLoader(
+ example(TopicPath.class),
+ example(SubscriptionPath.class),
+ topic -> handle.get().apply(topic),
+ Duration.millis(50),
+ true);
+ }
+
+ @Test
+ public void singleResult() {
+ when(getPartitionCount.apply(example(TopicPath.class))).thenReturn(3);
+ PCollection output = pipeline.apply(loader);
+ PAssert.that(output)
+ .containsInAnyOrder(
+ SubscriptionPartition.of(example(SubscriptionPath.class), Partition.of(0)),
+ SubscriptionPartition.of(example(SubscriptionPath.class), Partition.of(1)),
+ SubscriptionPartition.of(example(SubscriptionPath.class), Partition.of(2)));
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void addedResults() {
+ when(getPartitionCount.apply(example(TopicPath.class))).thenReturn(3).thenReturn(4);
+ PCollection output = pipeline.apply(loader);
+ PAssert.that(output)
+ .containsInAnyOrder(
+ SubscriptionPartition.of(example(SubscriptionPath.class), Partition.of(0)),
+ SubscriptionPartition.of(example(SubscriptionPath.class), Partition.of(1)),
+ SubscriptionPartition.of(example(SubscriptionPath.class), Partition.of(2)),
+ SubscriptionPartition.of(example(SubscriptionPath.class), Partition.of(3)));
+ pipeline.run().waitUntilFinish();
+ }
+}
diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImplTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImplTest.java
new file mode 100644
index 000000000..cdca6c470
--- /dev/null
+++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionProcessorImplTest.java
@@ -0,0 +1,226 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static com.google.cloud.pubsublite.beam.SubscriberOptions.DEFAULT_FLOW_CONTROL;
+import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode.Code;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.internal.CheckedApiException;
+import com.google.cloud.pubsublite.internal.testing.FakeApiService;
+import com.google.cloud.pubsublite.internal.wire.Subscriber;
+import com.google.cloud.pubsublite.proto.Cursor;
+import com.google.cloud.pubsublite.proto.FlowControlRequest;
+import com.google.cloud.pubsublite.proto.SeekRequest;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import com.google.protobuf.util.Timestamps;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Spy;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings("initialization.fields.uninitialized")
+public class SubscriptionPartitionProcessorImplTest {
+ @Spy RestrictionTracker tracker;
+ @Mock OutputReceiver receiver;
+ @Mock Function>, Subscriber> subscriberFactory;
+
+ abstract static class FakeSubscriber extends FakeApiService implements Subscriber {}
+
+ @Spy FakeSubscriber subscriber;
+
+ Consumer> leakedConsumer;
+ SubscriptionPartitionProcessor processor;
+
+ private static SequencedMessage messageWithOffset(long offset) {
+ return SequencedMessage.newBuilder()
+ .setCursor(Cursor.newBuilder().setOffset(offset))
+ .setPublishTime(Timestamps.fromMillis(10000 + offset))
+ .setSizeBytes(1024)
+ .build();
+ }
+
+ @Before
+ public void setUp() {
+ initMocks(this);
+ when(subscriberFactory.apply(any()))
+ .then(
+ args -> {
+ leakedConsumer = args.getArgument(0);
+ return subscriber;
+ });
+ processor =
+ new SubscriptionPartitionProcessorImpl(
+ tracker, receiver, subscriberFactory, DEFAULT_FLOW_CONTROL);
+ assertNotNull(leakedConsumer);
+ }
+
+ @Test
+ public void lifecycle() throws Exception {
+ when(tracker.currentRestriction())
+ .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+ when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class)));
+ processor.start();
+ verify(subscriber).startAsync();
+ verify(subscriber).awaitRunning();
+ verify(subscriber)
+ .seek(
+ SeekRequest.newBuilder()
+ .setCursor(Cursor.newBuilder().setOffset(example(Offset.class).value()))
+ .build());
+ verify(subscriber)
+ .allowFlow(
+ FlowControlRequest.newBuilder()
+ .setAllowedBytes(DEFAULT_FLOW_CONTROL.bytesOutstanding())
+ .setAllowedMessages(DEFAULT_FLOW_CONTROL.messagesOutstanding())
+ .build());
+ processor.close();
+ verify(subscriber).stopAsync();
+ verify(subscriber).awaitTerminated();
+ }
+
+ @Test
+ public void lifecycleSeekThrows() throws Exception {
+ when(tracker.currentRestriction())
+ .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+ when(subscriber.seek(any()))
+ .thenReturn(ApiFutures.immediateFailedFuture(new CheckedApiException(Code.OUT_OF_RANGE)));
+ doThrow(new CheckedApiException(Code.OUT_OF_RANGE)).when(subscriber).allowFlow(any());
+ assertThrows(CheckedApiException.class, () -> processor.start());
+ }
+
+ @Test
+ public void lifecycleFlowControlThrows() {
+ when(tracker.currentRestriction())
+ .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+ when(subscriber.seek(any()))
+ .thenReturn(ApiFutures.immediateFailedFuture(new CheckedApiException(Code.OUT_OF_RANGE)));
+ assertThrows(CheckedApiException.class, () -> processor.start());
+ }
+
+ @Test
+ public void lifecycleSubscriberAwaitThrows() throws Exception {
+ when(tracker.currentRestriction())
+ .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+ when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class)));
+ processor.start();
+ doThrow(new CheckedApiException(Code.INTERNAL).underlying).when(subscriber).awaitTerminated();
+ assertThrows(ApiException.class, () -> processor.close());
+ verify(subscriber).stopAsync();
+ verify(subscriber).awaitTerminated();
+ }
+
+ @Test
+ public void subscriberFailureFails() throws Exception {
+ when(tracker.currentRestriction())
+ .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+ when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class)));
+ processor.start();
+ subscriber.fail(new CheckedApiException(Code.OUT_OF_RANGE));
+ ApiException e =
+ assertThrows(ApiException.class, () -> processor.waitForCompletion(Duration.ZERO));
+ assertEquals(Code.OUT_OF_RANGE, e.getStatusCode().getCode());
+ }
+
+ @Test
+ public void allowFlowFailureFails() throws Exception {
+ when(tracker.currentRestriction())
+ .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+ when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class)));
+ processor.start();
+ when(tracker.tryClaim(any())).thenReturn(true);
+ doThrow(new CheckedApiException(Code.OUT_OF_RANGE)).when(subscriber).allowFlow(any());
+ leakedConsumer.accept(ImmutableList.of(messageWithOffset(1)));
+ ApiException e =
+ assertThrows(ApiException.class, () -> processor.waitForCompletion(Duration.ZERO));
+ assertEquals(Code.OUT_OF_RANGE, e.getStatusCode().getCode());
+ }
+
+ @Test
+ public void timeoutReturnsResume() {
+ assertEquals(ProcessContinuation.resume(), processor.waitForCompletion(Duration.millis(10)));
+ assertFalse(processor.lastClaimed().isPresent());
+ }
+
+ @Test
+ public void failedClaimCausesStop() {
+ when(tracker.tryClaim(any())).thenReturn(false);
+ leakedConsumer.accept(ImmutableList.of(messageWithOffset(1)));
+ verify(tracker, times(1)).tryClaim(any());
+ assertEquals(ProcessContinuation.stop(), processor.waitForCompletion(Duration.millis(10)));
+ assertFalse(processor.lastClaimed().isPresent());
+ // Future calls to process don't try to claim.
+ leakedConsumer.accept(ImmutableList.of(messageWithOffset(2)));
+ verify(tracker, times(1)).tryClaim(any());
+ }
+
+ @Test
+ public void successfulClaimThenTimeout() throws Exception {
+ when(tracker.tryClaim(any())).thenReturn(true);
+ SequencedMessage message1 = messageWithOffset(1);
+ SequencedMessage message3 = messageWithOffset(3);
+ leakedConsumer.accept(ImmutableList.of(message1, message3));
+ InOrder order = inOrder(tracker, receiver, subscriber);
+ order
+ .verify(tracker)
+ .tryClaim(
+ OffsetByteProgress.of(Offset.of(3), message1.getSizeBytes() + message3.getSizeBytes()));
+ order
+ .verify(receiver)
+ .outputWithTimestamp(message1, new Instant(Timestamps.toMillis(message1.getPublishTime())));
+ order
+ .verify(receiver)
+ .outputWithTimestamp(message3, new Instant(Timestamps.toMillis(message3.getPublishTime())));
+ order
+ .verify(subscriber)
+ .allowFlow(
+ FlowControlRequest.newBuilder()
+ .setAllowedMessages(2)
+ .setAllowedBytes(message1.getSizeBytes() + message3.getSizeBytes())
+ .build());
+ assertEquals(ProcessContinuation.resume(), processor.waitForCompletion(Duration.millis(10)));
+ assertEquals(processor.lastClaimed().get(), Offset.of(3));
+ }
+}
diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderImplTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderImplTest.java
new file mode 100644
index 000000000..bb00a3353
--- /dev/null
+++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderImplTest.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode.Code;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.internal.CheckedApiException;
+import com.google.cloud.pubsublite.internal.TopicStatsClient;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import com.google.protobuf.Timestamp;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+@SuppressWarnings("uninitialized")
+@RunWith(JUnit4.class)
+public final class TopicBacklogReaderImplTest {
+
+ @Rule public final MockitoRule mockito = MockitoJUnit.rule();
+
+ @Mock TopicStatsClient mockClient;
+
+ private TopicBacklogReader reader;
+
+ @Before
+ public void setUp() {
+ initMocks(this);
+ this.reader =
+ new TopicBacklogReaderImpl(mockClient, example(TopicPath.class), example(Partition.class));
+ }
+
+ @SuppressWarnings("incompatible")
+ @Test
+ public void computeMessageStats_failure() {
+ when(mockClient.computeMessageStats(
+ example(TopicPath.class),
+ example(Partition.class),
+ example(Offset.class),
+ Offset.of(Integer.MAX_VALUE)))
+ .thenReturn(
+ ApiFutures.immediateFailedFuture(new CheckedApiException(Code.UNAVAILABLE).underlying));
+
+ ApiException e =
+ assertThrows(ApiException.class, () -> reader.computeMessageStats(example(Offset.class)));
+ assertEquals(Code.UNAVAILABLE, e.getStatusCode().getCode());
+ }
+
+ @Test
+ public void computeMessageStats_validResponseCached() {
+ Timestamp minEventTime = Timestamp.newBuilder().setSeconds(1000).setNanos(10).build();
+ Timestamp minPublishTime = Timestamp.newBuilder().setSeconds(1001).setNanos(11).build();
+ ComputeMessageStatsResponse response =
+ ComputeMessageStatsResponse.newBuilder()
+ .setMessageCount(10)
+ .setMessageBytes(100)
+ .setMinimumEventTime(minEventTime.toBuilder().setSeconds(1002).build())
+ .setMinimumPublishTime(minPublishTime)
+ .build();
+
+ when(mockClient.computeMessageStats(
+ example(TopicPath.class),
+ example(Partition.class),
+ example(Offset.class),
+ Offset.of(Integer.MAX_VALUE)))
+ .thenReturn(ApiFutures.immediateFuture(response));
+
+ assertEquals(reader.computeMessageStats(example(Offset.class)), response);
+ }
+}
diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/UuidDeduplicationTransformTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/UuidDeduplicationTransformTest.java
new file mode 100644
index 000000000..ec05cdca6
--- /dev/null
+++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/UuidDeduplicationTransformTest.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.beam;
+
+import com.google.cloud.pubsublite.proto.AttributeValues;
+import com.google.cloud.pubsublite.proto.Cursor;
+import com.google.cloud.pubsublite.proto.PubSubMessage;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.util.Timestamps;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.Deduplicate;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class UuidDeduplicationTransformTest {
+ @Rule public final TestPipeline pipeline = TestPipeline.create();
+ private static final Instant START = new Instant(0);
+
+ private static SequencedMessage newMessage() {
+ Uuid uuid = Uuid.random();
+ return SequencedMessage.newBuilder()
+ .setMessage(
+ PubSubMessage.newBuilder()
+ .putAttributes(
+ Uuid.DEFAULT_ATTRIBUTE,
+ AttributeValues.newBuilder().addValues(uuid.value()).build()))
+ .setSizeBytes(10000)
+ .setPublishTime(Timestamps.EPOCH)
+ .setCursor(Cursor.newBuilder().setOffset(10))
+ .build();
+ }
+
+ @Test
+ public void unrelatedUuidsProxied() {
+ SequencedMessage message1 = newMessage();
+ SequencedMessage message2 = newMessage();
+
+ TestStream messageStream =
+ TestStream.create(ProtoCoder.of(SequencedMessage.class))
+ .advanceWatermarkTo(START)
+ .addElements(message1)
+ .advanceWatermarkTo(START.plus(Deduplicate.DEFAULT_DURATION.dividedBy(2)))
+ .addElements(message2)
+ .advanceWatermarkToInfinity();
+ PCollection results =
+ pipeline
+ .apply(messageStream)
+ .apply(new UuidDeduplicationTransform(UuidDeduplicationOptions.newBuilder().build()));
+ PAssert.that(results).containsInAnyOrder(message1, message2);
+ pipeline.run();
+ }
+
+ @Test
+ public void sameUuidsWithinWindowOnlyOne() {
+ SequencedMessage message = newMessage();
+
+ TestStream messageStream =
+ TestStream.create(ProtoCoder.of(SequencedMessage.class))
+ .advanceWatermarkTo(START)
+ .addElements(message)
+ .advanceWatermarkTo(START.plus(Deduplicate.DEFAULT_DURATION.dividedBy(2)))
+ .advanceWatermarkToInfinity();
+ PCollection results =
+ pipeline
+ .apply(messageStream)
+ .apply(new UuidDeduplicationTransform(UuidDeduplicationOptions.newBuilder().build()));
+ PAssert.that(results).containsInAnyOrder(message);
+ pipeline.run();
+ }
+
+ @Test
+ public void sameUuidsAfterGcOutsideWindowHasBoth() {
+ SequencedMessage message1 = newMessage();
+
+ TestStream messageStream =
+ TestStream.create(ProtoCoder.of(SequencedMessage.class))
+ .advanceWatermarkTo(START)
+ .addElements(message1)
+ .advanceWatermarkTo(
+ START.plus(
+ UuidDeduplicationOptions.DEFAULT_DEDUPLICATE_DURATION.plus(Duration.millis(1))))
+ .addElements(message1)
+ .advanceWatermarkToInfinity();
+ PCollection results =
+ pipeline
+ .apply(messageStream)
+ .apply(new UuidDeduplicationTransform(UuidDeduplicationOptions.newBuilder().build()));
+ PAssert.that(results).containsInAnyOrder(message1, message1);
+ pipeline.run();
+ }
+
+ @Test
+ public void dedupesBasedOnReturnedUuid() {
+ byte[] bytes = {(byte) 0x123, (byte) 0x456};
+ // These messages have different uuids, so they would both appear in the output collection if
+ // the extractor is not respected.
+ SequencedMessage message1 = newMessage();
+ SequencedMessage message2 = newMessage();
+
+ TestStream messageStream =
+ TestStream.create(ProtoCoder.of(SequencedMessage.class))
+ .advanceWatermarkTo(START)
+ .addElements(message1, message2)
+ .advanceWatermarkToInfinity();
+ PCollection results =
+ pipeline
+ .apply(messageStream)
+ .apply(
+ new UuidDeduplicationTransform(
+ UuidDeduplicationOptions.newBuilder()
+ .setUuidExtractor(message -> Uuid.of(ByteString.copyFrom(bytes)))
+ .build()));
+ PAssert.that(results)
+ .satisfies(
+ messages -> {
+ Preconditions.checkArgument(Iterables.size(messages) == 1);
+ return null;
+ });
+ pipeline.run();
+ }
+}