Skip to content

Commit

Permalink
feat: Add max message per batch option (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangmichaellll committed Jan 29, 2021
1 parent d340c52 commit e9c640f
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 15 deletions.
Expand Up @@ -25,7 +25,7 @@
public class Constants {
public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000;
public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE;
public static int DEFAULT_BATCH_OFFSET_RANGE = 100_000;
public static long DEFAULT_MAX_MESSAGES_PER_BATCH = Long.MAX_VALUE;
public static StructType DEFAULT_SCHEMA =
new StructType(
new StructField[] {
Expand All @@ -46,6 +46,8 @@ public class Constants {

public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK");

public static String MAX_MESSAGE_PER_BATCH_CONFIG_KEY =
"pubsublite.flowcontrol.maxmessagesperbatch";
public static String BYTES_OUTSTANDING_CONFIG_KEY =
"pubsublite.flowcontrol.byteoutstandingperpartition";
public static String MESSAGES_OUTSTANDING_CONFIG_KEY =
Expand Down
Expand Up @@ -98,6 +98,7 @@ public MicroBatchReader createMicroBatchReader(
Ticker.systemTicker()),
subscriptionPath,
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
pslDataSourceOptions.maxMessagesPerBatch(),
topicPartitionCount);
}
}
Expand Up @@ -42,7 +42,6 @@
import com.google.cloud.pubsublite.v1.TopicStatsServiceSettings;
import java.io.IOException;
import java.io.Serializable;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.spark.sql.sources.v2.DataSourceOptions;

Expand All @@ -55,18 +54,19 @@ public abstract class PslDataSourceOptions implements Serializable {

public abstract SubscriptionPath subscriptionPath();

@Nullable
public abstract FlowControlSettings flowControlSettings();

public abstract long maxBatchOffsetRange();
public abstract long maxMessagesPerBatch();

public static Builder builder() {
return new AutoValue_PslDataSourceOptions.Builder()
.setCredentialsKey(null)
// TODO(jiangmichael): Revisit this later about if we need to expose this as a user
// configurable option. Ideally we should expose bytes range/# msgs range not
// offsets range since PSL doesn't guarantee offset = msg.
.setMaxBatchOffsetRange(Constants.DEFAULT_BATCH_OFFSET_RANGE);
.setMaxMessagesPerBatch(Constants.DEFAULT_MAX_MESSAGES_PER_BATCH)
.setFlowControlSettings(
FlowControlSettings.builder()
.setMessagesOutstanding(Constants.DEFAULT_MESSAGES_OUTSTANDING)
.setBytesOutstanding(Constants.DEFAULT_BYTES_OUTSTANDING)
.build());
}

public static PslDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions options) {
Expand All @@ -75,10 +75,10 @@ public static PslDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions
}

Builder builder = builder();
Optional<String> value;
if ((value = options.get(Constants.CREDENTIALS_KEY_CONFIG_KEY)).isPresent()) {
builder.setCredentialsKey(value.get());
}
options.get(Constants.CREDENTIALS_KEY_CONFIG_KEY).ifPresent(builder::setCredentialsKey);
options
.get(Constants.MAX_MESSAGE_PER_BATCH_CONFIG_KEY)
.ifPresent(mmpb -> builder.setMaxMessagesPerBatch(Long.parseLong(mmpb)));
return builder
.setSubscriptionPath(
SubscriptionPath.parse(options.get(Constants.SUBSCRIPTION_CONFIG_KEY).get()))
Expand All @@ -103,7 +103,7 @@ public abstract static class Builder {

public abstract Builder setSubscriptionPath(SubscriptionPath subscriptionPath);

public abstract Builder setMaxBatchOffsetRange(long maxBatchOffsetRange);
public abstract Builder setMaxMessagesPerBatch(long maxMessagesPerBatch);

public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);

Expand Down
Expand Up @@ -64,7 +64,13 @@ public boolean next() {
msg = subscriber.messageIfAvailable();
break;
} catch (TimeoutException e) {
log.atWarning().log("Unable to get any messages in last " + SUBSCRIBER_PULL_TIMEOUT);
log.atWarning().log(
String.format(
"Unable to get any messages in last %s. Partition: %d; Current message offset: %s; End message offset: %d.",
SUBSCRIBER_PULL_TIMEOUT.toString(),
endOffset.partition().value(),
currentMsg == null ? "null" : currentMsg.offset().value(),
endOffset.offset()));
} catch (Throwable t) {
throw new IllegalStateException("Failed to retrieve messages.", t);
}
Expand Down
Expand Up @@ -42,6 +42,7 @@ public class PslMicroBatchReader implements MicroBatchReader {
private final SubscriptionPath subscriptionPath;
private final FlowControlSettings flowControlSettings;
private final long topicPartitionCount;
private final long maxMessagesPerBatch;
@Nullable private SparkSourceOffset startOffset = null;
private SparkSourceOffset endOffset;

Expand All @@ -52,6 +53,7 @@ public PslMicroBatchReader(
PerTopicHeadOffsetReader headOffsetReader,
SubscriptionPath subscriptionPath,
FlowControlSettings flowControlSettings,
long maxMessagesPerBatch,
long topicPartitionCount) {
this.cursorClient = cursorClient;
this.committer = committer;
Expand All @@ -60,6 +62,7 @@ public PslMicroBatchReader(
this.subscriptionPath = subscriptionPath;
this.flowControlSettings = flowControlSettings;
this.topicPartitionCount = topicPartitionCount;
this.maxMessagesPerBatch = maxMessagesPerBatch;
}

@Override
Expand All @@ -79,7 +82,11 @@ public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
"end offset is not instance of SparkSourceOffset.");
endOffset = (SparkSourceOffset) end.get();
} else {
endOffset = PslSparkUtils.toSparkSourceOffset(headOffsetReader.getHeadOffset());
SparkSourceOffset headOffset =
PslSparkUtils.toSparkSourceOffset(headOffsetReader.getHeadOffset());
endOffset =
PslSparkUtils.getSparkEndOffset(
headOffset, startOffset, maxMessagesPerBatch, topicPartitionCount);
}
}

Expand Down
21 changes: 21 additions & 0 deletions src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.common.collect.ListMultimap;
import com.google.common.math.LongMath;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import java.util.ArrayList;
Expand Down Expand Up @@ -132,4 +133,24 @@ public static SparkSourceOffset getSparkStartOffset(
"Failed to get information from PSL and construct startOffset", e);
}
}

// EndOffset = min(startOffset + batchOffsetRange, headOffset)
public static SparkSourceOffset getSparkEndOffset(
SparkSourceOffset headOffset,
SparkSourceOffset startOffset,
long maxMessagesPerBatch,
long topicPartitionCount) {
Map<Partition, SparkPartitionOffset> map = new HashMap<>();
for (int i = 0; i < topicPartitionCount; i++) {
Partition p = Partition.of(i);
SparkPartitionOffset emptyPartition = SparkPartitionOffset.create(p, -1L);
long head = headOffset.getPartitionOffsetMap().getOrDefault(p, emptyPartition).offset();
long start = startOffset.getPartitionOffsetMap().getOrDefault(p, emptyPartition).offset();
map.put(
p,
SparkPartitionOffset.create(
p, Math.min(LongMath.saturatedAdd(start, maxMessagesPerBatch), head)));
}
return new SparkSourceOffset(map);
}
}
Expand Up @@ -40,6 +40,7 @@ public class PslMicroBatchReaderTest {
private final PartitionSubscriberFactory partitionSubscriberFactory =
mock(PartitionSubscriberFactory.class);
private final PerTopicHeadOffsetReader headOffsetReader = mock(PerTopicHeadOffsetReader.class);
private static final long MAX_MESSAGES_PER_BATCH = 20000;
private final PslMicroBatchReader reader =
new PslMicroBatchReader(
cursorClient,
Expand All @@ -48,6 +49,7 @@ public class PslMicroBatchReaderTest {
headOffsetReader,
UnitTestExamples.exampleSubscriptionPath(),
OPTIONS.flowControlSettings(),
MAX_MESSAGES_PER_BATCH,
2);

private PslSourceOffset createPslSourceOffsetTwoPartition(long offset0, long offset1) {
Expand Down Expand Up @@ -119,4 +121,22 @@ public void testPlanInputPartitionNoMessage() {
reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset));
assertThat(reader.planInputPartitions()).hasSize(1);
}

@Test
public void testMaxMessagesPerBatch() {
when(cursorClient.listPartitionCursors(UnitTestExamples.exampleSubscriptionPath()))
.thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(0L), Offset.of(100L))));
when(headOffsetReader.getHeadOffset())
.thenReturn(createPslSourceOffsetTwoPartition(10000000L, 0L));
reader.setOffsetRange(Optional.empty(), Optional.empty());
assertThat(((SparkSourceOffset) reader.getEndOffset()).getPartitionOffsetMap())
.containsExactly(
Partition.of(0L),
// the maxMessagesPerBatch setting takes effect as 100L + maxMessagesPerBatch is less
// than
// 10000000L.
SparkPartitionOffset.create(Partition.of(0L), 100L + MAX_MESSAGES_PER_BATCH - 1L),
Partition.of(1L),
SparkPartitionOffset.create(Partition.of(1L), -1L));
}
}

0 comments on commit e9c640f

Please sign in to comment.