diff --git a/src/main/java/com/google/cloud/pubsublite/spark/Constants.java b/src/main/java/com/google/cloud/pubsublite/spark/Constants.java index bdc8a1bd..cac4337a 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/Constants.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/Constants.java @@ -25,7 +25,7 @@ public class Constants { public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000; public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE; - public static int DEFAULT_BATCH_OFFSET_RANGE = 100_000; + public static long DEFAULT_MAX_MESSAGES_PER_BATCH = Long.MAX_VALUE; public static StructType DEFAULT_SCHEMA = new StructType( new StructField[] { @@ -46,6 +46,8 @@ public class Constants { public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK"); + public static String MAX_MESSAGE_PER_BATCH_CONFIG_KEY = + "pubsublite.flowcontrol.maxmessagesperbatch"; public static String BYTES_OUTSTANDING_CONFIG_KEY = "pubsublite.flowcontrol.byteoutstandingperpartition"; public static String MESSAGES_OUTSTANDING_CONFIG_KEY = diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java b/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java index 0ac68727..b07122c4 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java @@ -98,6 +98,7 @@ public MicroBatchReader createMicroBatchReader( Ticker.systemTicker()), subscriptionPath, Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()), + pslDataSourceOptions.maxMessagesPerBatch(), topicPartitionCount); } } diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java b/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java index 8db019fa..3a390eaf 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java @@ -42,7 +42,6 @@ import com.google.cloud.pubsublite.v1.TopicStatsServiceSettings; import java.io.IOException; import java.io.Serializable; -import java.util.Optional; import javax.annotation.Nullable; import org.apache.spark.sql.sources.v2.DataSourceOptions; @@ -55,18 +54,19 @@ public abstract class PslDataSourceOptions implements Serializable { public abstract SubscriptionPath subscriptionPath(); - @Nullable public abstract FlowControlSettings flowControlSettings(); - public abstract long maxBatchOffsetRange(); + public abstract long maxMessagesPerBatch(); public static Builder builder() { return new AutoValue_PslDataSourceOptions.Builder() .setCredentialsKey(null) - // TODO(jiangmichael): Revisit this later about if we need to expose this as a user - // configurable option. Ideally we should expose bytes range/# msgs range not - // offsets range since PSL doesn't guarantee offset = msg. - .setMaxBatchOffsetRange(Constants.DEFAULT_BATCH_OFFSET_RANGE); + .setMaxMessagesPerBatch(Constants.DEFAULT_MAX_MESSAGES_PER_BATCH) + .setFlowControlSettings( + FlowControlSettings.builder() + .setMessagesOutstanding(Constants.DEFAULT_MESSAGES_OUTSTANDING) + .setBytesOutstanding(Constants.DEFAULT_BYTES_OUTSTANDING) + .build()); } public static PslDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions options) { @@ -75,10 +75,10 @@ public static PslDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions } Builder builder = builder(); - Optional value; - if ((value = options.get(Constants.CREDENTIALS_KEY_CONFIG_KEY)).isPresent()) { - builder.setCredentialsKey(value.get()); - } + options.get(Constants.CREDENTIALS_KEY_CONFIG_KEY).ifPresent(builder::setCredentialsKey); + options + .get(Constants.MAX_MESSAGE_PER_BATCH_CONFIG_KEY) + .ifPresent(mmpb -> builder.setMaxMessagesPerBatch(Long.parseLong(mmpb))); return builder .setSubscriptionPath( SubscriptionPath.parse(options.get(Constants.SUBSCRIPTION_CONFIG_KEY).get())) @@ -103,7 +103,7 @@ public abstract static class Builder { public abstract Builder setSubscriptionPath(SubscriptionPath subscriptionPath); - public abstract Builder setMaxBatchOffsetRange(long maxBatchOffsetRange); + public abstract Builder setMaxMessagesPerBatch(long maxMessagesPerBatch); public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings); diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java index d2eebd23..2c1b0816 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java @@ -64,7 +64,13 @@ public boolean next() { msg = subscriber.messageIfAvailable(); break; } catch (TimeoutException e) { - log.atWarning().log("Unable to get any messages in last " + SUBSCRIBER_PULL_TIMEOUT); + log.atWarning().log( + String.format( + "Unable to get any messages in last %s. Partition: %d; Current message offset: %s; End message offset: %d.", + SUBSCRIBER_PULL_TIMEOUT.toString(), + endOffset.partition().value(), + currentMsg == null ? "null" : currentMsg.offset().value(), + endOffset.offset())); } catch (Throwable t) { throw new IllegalStateException("Failed to retrieve messages.", t); } diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java index f3828792..0c2533f8 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java @@ -42,6 +42,7 @@ public class PslMicroBatchReader implements MicroBatchReader { private final SubscriptionPath subscriptionPath; private final FlowControlSettings flowControlSettings; private final long topicPartitionCount; + private final long maxMessagesPerBatch; @Nullable private SparkSourceOffset startOffset = null; private SparkSourceOffset endOffset; @@ -52,6 +53,7 @@ public PslMicroBatchReader( PerTopicHeadOffsetReader headOffsetReader, SubscriptionPath subscriptionPath, FlowControlSettings flowControlSettings, + long maxMessagesPerBatch, long topicPartitionCount) { this.cursorClient = cursorClient; this.committer = committer; @@ -60,6 +62,7 @@ public PslMicroBatchReader( this.subscriptionPath = subscriptionPath; this.flowControlSettings = flowControlSettings; this.topicPartitionCount = topicPartitionCount; + this.maxMessagesPerBatch = maxMessagesPerBatch; } @Override @@ -79,7 +82,11 @@ public void setOffsetRange(Optional start, Optional end) { "end offset is not instance of SparkSourceOffset."); endOffset = (SparkSourceOffset) end.get(); } else { - endOffset = PslSparkUtils.toSparkSourceOffset(headOffsetReader.getHeadOffset()); + SparkSourceOffset headOffset = + PslSparkUtils.toSparkSourceOffset(headOffsetReader.getHeadOffset()); + endOffset = + PslSparkUtils.getSparkEndOffset( + headOffset, startOffset, maxMessagesPerBatch, topicPartitionCount); } } diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java index 962d5dc0..4487ce9a 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java @@ -25,6 +25,7 @@ import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.internal.CursorClient; import com.google.common.collect.ListMultimap; +import com.google.common.math.LongMath; import com.google.protobuf.ByteString; import com.google.protobuf.util.Timestamps; import java.util.ArrayList; @@ -132,4 +133,24 @@ public static SparkSourceOffset getSparkStartOffset( "Failed to get information from PSL and construct startOffset", e); } } + + // EndOffset = min(startOffset + batchOffsetRange, headOffset) + public static SparkSourceOffset getSparkEndOffset( + SparkSourceOffset headOffset, + SparkSourceOffset startOffset, + long maxMessagesPerBatch, + long topicPartitionCount) { + Map map = new HashMap<>(); + for (int i = 0; i < topicPartitionCount; i++) { + Partition p = Partition.of(i); + SparkPartitionOffset emptyPartition = SparkPartitionOffset.create(p, -1L); + long head = headOffset.getPartitionOffsetMap().getOrDefault(p, emptyPartition).offset(); + long start = startOffset.getPartitionOffsetMap().getOrDefault(p, emptyPartition).offset(); + map.put( + p, + SparkPartitionOffset.create( + p, Math.min(LongMath.saturatedAdd(start, maxMessagesPerBatch), head))); + } + return new SparkSourceOffset(map); + } } diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.java index c8f64408..13649f05 100644 --- a/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.java +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.java @@ -40,6 +40,7 @@ public class PslMicroBatchReaderTest { private final PartitionSubscriberFactory partitionSubscriberFactory = mock(PartitionSubscriberFactory.class); private final PerTopicHeadOffsetReader headOffsetReader = mock(PerTopicHeadOffsetReader.class); + private static final long MAX_MESSAGES_PER_BATCH = 20000; private final PslMicroBatchReader reader = new PslMicroBatchReader( cursorClient, @@ -48,6 +49,7 @@ public class PslMicroBatchReaderTest { headOffsetReader, UnitTestExamples.exampleSubscriptionPath(), OPTIONS.flowControlSettings(), + MAX_MESSAGES_PER_BATCH, 2); private PslSourceOffset createPslSourceOffsetTwoPartition(long offset0, long offset1) { @@ -119,4 +121,22 @@ public void testPlanInputPartitionNoMessage() { reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset)); assertThat(reader.planInputPartitions()).hasSize(1); } + + @Test + public void testMaxMessagesPerBatch() { + when(cursorClient.listPartitionCursors(UnitTestExamples.exampleSubscriptionPath())) + .thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(0L), Offset.of(100L)))); + when(headOffsetReader.getHeadOffset()) + .thenReturn(createPslSourceOffsetTwoPartition(10000000L, 0L)); + reader.setOffsetRange(Optional.empty(), Optional.empty()); + assertThat(((SparkSourceOffset) reader.getEndOffset()).getPartitionOffsetMap()) + .containsExactly( + Partition.of(0L), + // the maxMessagesPerBatch setting takes effect as 100L + maxMessagesPerBatch is less + // than + // 10000000L. + SparkPartitionOffset.create(Partition.of(0L), 100L + MAX_MESSAGES_PER_BATCH - 1L), + Partition.of(1L), + SparkPartitionOffset.create(Partition.of(1L), -1L)); + } }