Skip to content

Commit

Permalink
deps: update to google-cloud-pubsublite to v0.15.0 (#192)
Browse files Browse the repository at this point in the history
Manual update to handle the interface change of BlockingPullSubscriberImpl - the SubscriberBuilder now accepts an initial location when connecting a new Subscribe stream and an additional seek request is not necessary.
  • Loading branch information
tmdiep committed Jun 10, 2021
1 parent 7d1ac2f commit 002191f
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 43 deletions.
15 changes: 11 additions & 4 deletions clirr-ignored-differences.xml
@@ -1,6 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/pubsublite/spark/internal/*</className>
<method>*</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/pubsublite/spark/*Reader</className>
Expand All @@ -13,16 +18,18 @@
<to>*</to>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader</className>
<differenceType>7005</differenceType>
<className>com/google/cloud/pubsublite/spark/*InputPartition</className>
<method>*</method>
<to>*</to>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/spark/MultiPartitionCommitter*</className>
<className>com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/spark/PartitionSubscriberFactory</className>
<className>com/google/cloud/pubsublite/spark/MultiPartitionCommitter*</className>
</difference>
<difference>
<differenceType>8001</differenceType>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -43,7 +43,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>0.14.2</version>
<version>0.15.0</version>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
Expand Down
Expand Up @@ -22,9 +22,7 @@
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
import java.io.Serializable;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.ContinuousInputPartition;
Expand All @@ -34,13 +32,13 @@
public class PslContinuousInputPartition
implements ContinuousInputPartition<InternalRow>, Serializable {

private final SubscriberFactory subscriberFactory;
private final PartitionSubscriberFactory subscriberFactory;
private final SparkPartitionOffset startOffset;
private final SubscriptionPath subscriptionPath;
private final FlowControlSettings flowControlSettings;

public PslContinuousInputPartition(
SubscriberFactory subscriberFactory,
PartitionSubscriberFactory subscriberFactory,
SparkPartitionOffset startOffset,
SubscriptionPath subscriptionPath,
FlowControlSettings flowControlSettings) {
Expand All @@ -63,12 +61,10 @@ public InputPartitionReader<InternalRow> createContinuousReader(PartitionOffset
try {
subscriber =
new BlockingPullSubscriberImpl(
subscriberFactory,
flowControlSettings,
SeekRequest.newBuilder()
.setCursor(
Cursor.newBuilder().setOffset(pslPartitionOffset.offset().value()).build())
.build());
(consumer) ->
subscriberFactory.newSubscriber(
pslPartitionOffset.partition(), pslPartitionOffset.offset(), consumer),
flowControlSettings);
} catch (CheckedApiException e) {
throw new IllegalStateException(
"Unable to create PSL subscriber for " + startOffset.toString(), e);
Expand Down
Expand Up @@ -21,7 +21,6 @@
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter;
import com.google.cloud.pubsublite.spark.internal.PartitionCountReader;
import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
Expand Down Expand Up @@ -116,12 +115,9 @@ public StructType readSchema() {
public List<InputPartition<InternalRow>> planInputPartitions() {
List<InputPartition<InternalRow>> list = new ArrayList<>();
for (SparkPartitionOffset offset : startOffset.getPartitionOffsetMap().values()) {
PartitionSubscriberFactory partitionSubscriberFactory = this.partitionSubscriberFactory;
SubscriberFactory subscriberFactory =
(consumer) -> partitionSubscriberFactory.newSubscriber(offset.partition(), consumer);
list.add(
new PslContinuousInputPartition(
subscriberFactory,
partitionSubscriberFactory,
SparkPartitionOffset.builder()
.partition(offset.partition())
.offset(offset.offset())
Expand Down
Expand Up @@ -21,16 +21,14 @@
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;

public class PslMicroBatchInputPartition implements InputPartition<InternalRow> {

private final SubscriberFactory subscriberFactory;
private final PartitionSubscriberFactory subscriberFactory;
private final SparkPartitionOffset startOffset;
private final SparkPartitionOffset endOffset;
private final SubscriptionPath subscriptionPath;
Expand All @@ -41,7 +39,7 @@ public PslMicroBatchInputPartition(
FlowControlSettings flowControlSettings,
SparkPartitionOffset startOffset,
SparkPartitionOffset endOffset,
SubscriberFactory subscriberFactory) {
PartitionSubscriberFactory subscriberFactory) {
this.startOffset = startOffset;
this.endOffset = endOffset;
this.subscriptionPath = subscriptionPath;
Expand All @@ -53,17 +51,13 @@ public PslMicroBatchInputPartition(
public InputPartitionReader<InternalRow> createPartitionReader() {
BlockingPullSubscriber subscriber;
try {
PslPartitionOffset pslStartOffset = PslSparkUtils.toPslPartitionOffset(startOffset);
subscriber =
new BlockingPullSubscriberImpl(
subscriberFactory,
flowControlSettings,
SeekRequest.newBuilder()
.setCursor(
Cursor.newBuilder()
.setOffset(
PslSparkUtils.toPslPartitionOffset(startOffset).offset().value())
.build())
.build());
(consumer) ->
subscriberFactory.newSubscriber(
pslStartOffset.partition(), pslStartOffset.offset(), consumer),
flowControlSettings);
} catch (CheckedApiException e) {
throw new IllegalStateException(
"Unable to create PSL subscriber for " + endOffset.partition(), e);
Expand Down
Expand Up @@ -23,7 +23,6 @@
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter;
import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
import com.google.cloud.pubsublite.spark.internal.PerTopicHeadOffsetReader;
Expand Down Expand Up @@ -145,17 +144,13 @@ public List<InputPartition<InternalRow>> planInputPartitions() {
// There is no message to pull for this partition.
continue;
}
PartitionSubscriberFactory partitionSubscriberFactory = this.partitionSubscriberFactory;
SubscriberFactory subscriberFactory =
(consumer) ->
partitionSubscriberFactory.newSubscriber(endPartitionOffset.partition(), consumer);
list.add(
new PslMicroBatchInputPartition(
subscriptionPath,
flowControlSettings,
startPartitionOffset,
endPartitionOffset,
subscriberFactory));
partitionSubscriberFactory));
}
return list;
}
Expand Down
Expand Up @@ -33,6 +33,8 @@
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.ServiceClients;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter;
import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitterImpl;
import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
Expand Down Expand Up @@ -135,7 +137,7 @@ MultiPartitionCommitter newMultiPartitionCommitter(long topicPartitionCount) {
}

PartitionSubscriberFactory getSubscriberFactory() {
return (partition, consumer) -> {
return (partition, offset, consumer) -> {
PubsubContext context = PubsubContext.of(Constants.FRAMEWORK);
SubscriberServiceSettings.Builder settingsBuilder =
SubscriberServiceSettings.newBuilder()
Expand All @@ -151,6 +153,10 @@ PartitionSubscriberFactory getSubscriberFactory() {
.setPartition(partition)
.setServiceClient(serviceClient)
.setMessageConsumer(consumer)
.setInitialLocation(
SeekRequest.newBuilder()
.setCursor(Cursor.newBuilder().setOffset(offset.value()))
.build())
.build();
} catch (IOException e) {
throw new IllegalStateException("Failed to create subscriber service.", e);
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.pubsublite.spark.internal;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
Expand All @@ -26,6 +27,8 @@

public interface PartitionSubscriberFactory extends Serializable {
Subscriber newSubscriber(
Partition partition, Consumer<ImmutableList<SequencedMessage>> message_consumer)
Partition partition,
Offset offset,
Consumer<ImmutableList<SequencedMessage>> message_consumer)
throws ApiException;
}

0 comments on commit 002191f

Please sign in to comment.