diff --git a/clirr-ignored-differences.xml b/clirr-ignored-differences.xml index 2921d7b6..13eef796 100644 --- a/clirr-ignored-differences.xml +++ b/clirr-ignored-differences.xml @@ -1,6 +1,11 @@ + + 7004 + com/google/cloud/pubsublite/spark/internal/* + * + 7004 com/google/cloud/pubsublite/spark/*Reader @@ -13,16 +18,18 @@ * - 8001 - com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader + 7005 + com/google/cloud/pubsublite/spark/*InputPartition + * + * 8001 - com/google/cloud/pubsublite/spark/MultiPartitionCommitter* + com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader 8001 - com/google/cloud/pubsublite/spark/PartitionSubscriberFactory + com/google/cloud/pubsublite/spark/MultiPartitionCommitter* 8001 diff --git a/pom.xml b/pom.xml index a857bf85..9c1bfd35 100644 --- a/pom.xml +++ b/pom.xml @@ -43,7 +43,7 @@ com.google.cloud google-cloud-pubsublite - 0.14.2 + 0.15.0 com.google.api.grpc diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartition.java b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartition.java index c65ae80b..00fcc4eb 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartition.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartition.java @@ -22,9 +22,7 @@ import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; -import com.google.cloud.pubsublite.proto.Cursor; -import com.google.cloud.pubsublite.proto.SeekRequest; +import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory; import java.io.Serializable; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.reader.ContinuousInputPartition; @@ -34,13 +32,13 @@ public class PslContinuousInputPartition implements ContinuousInputPartition, Serializable { - private final SubscriberFactory subscriberFactory; + private final PartitionSubscriberFactory subscriberFactory; private final SparkPartitionOffset startOffset; private final SubscriptionPath subscriptionPath; private final FlowControlSettings flowControlSettings; public PslContinuousInputPartition( - SubscriberFactory subscriberFactory, + PartitionSubscriberFactory subscriberFactory, SparkPartitionOffset startOffset, SubscriptionPath subscriptionPath, FlowControlSettings flowControlSettings) { @@ -63,12 +61,10 @@ public InputPartitionReader createContinuousReader(PartitionOffset try { subscriber = new BlockingPullSubscriberImpl( - subscriberFactory, - flowControlSettings, - SeekRequest.newBuilder() - .setCursor( - Cursor.newBuilder().setOffset(pslPartitionOffset.offset().value()).build()) - .build()); + (consumer) -> + subscriberFactory.newSubscriber( + pslPartitionOffset.partition(), pslPartitionOffset.offset(), consumer), + flowControlSettings); } catch (CheckedApiException e) { throw new IllegalStateException( "Unable to create PSL subscriber for " + startOffset.toString(), e); diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java index d984b174..7f01d789 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java @@ -21,7 +21,6 @@ import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; import com.google.cloud.pubsublite.internal.CursorClient; -import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter; import com.google.cloud.pubsublite.spark.internal.PartitionCountReader; import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory; @@ -116,12 +115,9 @@ public StructType readSchema() { public List> planInputPartitions() { List> list = new ArrayList<>(); for (SparkPartitionOffset offset : startOffset.getPartitionOffsetMap().values()) { - PartitionSubscriberFactory partitionSubscriberFactory = this.partitionSubscriberFactory; - SubscriberFactory subscriberFactory = - (consumer) -> partitionSubscriberFactory.newSubscriber(offset.partition(), consumer); list.add( new PslContinuousInputPartition( - subscriberFactory, + partitionSubscriberFactory, SparkPartitionOffset.builder() .partition(offset.partition()) .offset(offset.offset()) diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartition.java b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartition.java index 42173680..e2dc8fa8 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartition.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartition.java @@ -21,16 +21,14 @@ import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; -import com.google.cloud.pubsublite.proto.Cursor; -import com.google.cloud.pubsublite.proto.SeekRequest; +import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.reader.InputPartition; import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; public class PslMicroBatchInputPartition implements InputPartition { - private final SubscriberFactory subscriberFactory; + private final PartitionSubscriberFactory subscriberFactory; private final SparkPartitionOffset startOffset; private final SparkPartitionOffset endOffset; private final SubscriptionPath subscriptionPath; @@ -41,7 +39,7 @@ public PslMicroBatchInputPartition( FlowControlSettings flowControlSettings, SparkPartitionOffset startOffset, SparkPartitionOffset endOffset, - SubscriberFactory subscriberFactory) { + PartitionSubscriberFactory subscriberFactory) { this.startOffset = startOffset; this.endOffset = endOffset; this.subscriptionPath = subscriptionPath; @@ -53,17 +51,13 @@ public PslMicroBatchInputPartition( public InputPartitionReader createPartitionReader() { BlockingPullSubscriber subscriber; try { + PslPartitionOffset pslStartOffset = PslSparkUtils.toPslPartitionOffset(startOffset); subscriber = new BlockingPullSubscriberImpl( - subscriberFactory, - flowControlSettings, - SeekRequest.newBuilder() - .setCursor( - Cursor.newBuilder() - .setOffset( - PslSparkUtils.toPslPartitionOffset(startOffset).offset().value()) - .build()) - .build()); + (consumer) -> + subscriberFactory.newSubscriber( + pslStartOffset.partition(), pslStartOffset.offset(), consumer), + flowControlSettings); } catch (CheckedApiException e) { throw new IllegalStateException( "Unable to create PSL subscriber for " + endOffset.partition(), e); diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java index d526526a..10662230 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java @@ -23,7 +23,6 @@ import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; import com.google.cloud.pubsublite.internal.CursorClient; -import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter; import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory; import com.google.cloud.pubsublite.spark.internal.PerTopicHeadOffsetReader; @@ -145,17 +144,13 @@ public List> planInputPartitions() { // There is no message to pull for this partition. continue; } - PartitionSubscriberFactory partitionSubscriberFactory = this.partitionSubscriberFactory; - SubscriberFactory subscriberFactory = - (consumer) -> - partitionSubscriberFactory.newSubscriber(endPartitionOffset.partition(), consumer); list.add( new PslMicroBatchInputPartition( subscriptionPath, flowControlSettings, startPartitionOffset, endPartitionOffset, - subscriberFactory)); + partitionSubscriberFactory)); } return list; } diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java b/src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java index f5987788..aaca0ccb 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java @@ -33,6 +33,8 @@ import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; import com.google.cloud.pubsublite.internal.wire.ServiceClients; import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder; +import com.google.cloud.pubsublite.proto.Cursor; +import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter; import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitterImpl; import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory; @@ -135,7 +137,7 @@ MultiPartitionCommitter newMultiPartitionCommitter(long topicPartitionCount) { } PartitionSubscriberFactory getSubscriberFactory() { - return (partition, consumer) -> { + return (partition, offset, consumer) -> { PubsubContext context = PubsubContext.of(Constants.FRAMEWORK); SubscriberServiceSettings.Builder settingsBuilder = SubscriberServiceSettings.newBuilder() @@ -151,6 +153,10 @@ PartitionSubscriberFactory getSubscriberFactory() { .setPartition(partition) .setServiceClient(serviceClient) .setMessageConsumer(consumer) + .setInitialLocation( + SeekRequest.newBuilder() + .setCursor(Cursor.newBuilder().setOffset(offset.value())) + .build()) .build(); } catch (IOException e) { throw new IllegalStateException("Failed to create subscriber service.", e); diff --git a/src/main/java/com/google/cloud/pubsublite/spark/internal/PartitionSubscriberFactory.java b/src/main/java/com/google/cloud/pubsublite/spark/internal/PartitionSubscriberFactory.java index d7a16257..8ef57d3b 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/internal/PartitionSubscriberFactory.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/internal/PartitionSubscriberFactory.java @@ -17,6 +17,7 @@ package com.google.cloud.pubsublite.spark.internal; import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.internal.wire.Subscriber; @@ -26,6 +27,8 @@ public interface PartitionSubscriberFactory extends Serializable { Subscriber newSubscriber( - Partition partition, Consumer> message_consumer) + Partition partition, + Offset offset, + Consumer> message_consumer) throws ApiException; }