Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add max message per batch option #14

Merged
merged 11 commits into from Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -25,7 +25,7 @@
public class Constants {
public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000;
public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE;
public static int DEFAULT_BATCH_OFFSET_RANGE = 100_000;
public static long DEFAULT_MAX_MESSAGES_PER_BATCH = Long.MAX_VALUE;
public static StructType DEFAULT_SCHEMA =
new StructType(
new StructField[] {
Expand All @@ -46,6 +46,8 @@ public class Constants {

public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK");

public static String MAX_MESSAGE_PER_BATCH_CONFIG_KEY =
"pubsublite.flowcontrol.maxmessagesperbatch";
public static String BYTES_OUTSTANDING_CONFIG_KEY =
"pubsublite.flowcontrol.byteoutstandingperpartition";
public static String MESSAGES_OUTSTANDING_CONFIG_KEY =
Expand Down
Expand Up @@ -98,6 +98,7 @@ public MicroBatchReader createMicroBatchReader(
Ticker.systemTicker()),
subscriptionPath,
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
pslDataSourceOptions.maxMessagesPerBatch(),
topicPartitionCount);
}
}
Expand Up @@ -42,7 +42,6 @@
import com.google.cloud.pubsublite.v1.TopicStatsServiceSettings;
import java.io.IOException;
import java.io.Serializable;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.spark.sql.sources.v2.DataSourceOptions;

Expand All @@ -55,18 +54,19 @@ public abstract class PslDataSourceOptions implements Serializable {

public abstract SubscriptionPath subscriptionPath();

@Nullable
public abstract FlowControlSettings flowControlSettings();

public abstract long maxBatchOffsetRange();
public abstract long maxMessagesPerBatch();

public static Builder builder() {
return new AutoValue_PslDataSourceOptions.Builder()
.setCredentialsKey(null)
// TODO(jiangmichael): Revisit this later about if we need to expose this as a user
// configurable option. Ideally we should expose bytes range/# msgs range not
// offsets range since PSL doesn't guarantee offset = msg.
.setMaxBatchOffsetRange(Constants.DEFAULT_BATCH_OFFSET_RANGE);
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved
.setMaxMessagesPerBatch(Constants.DEFAULT_MAX_MESSAGES_PER_BATCH)
.setFlowControlSettings(
FlowControlSettings.builder()
.setMessagesOutstanding(Constants.DEFAULT_MESSAGES_OUTSTANDING)
.setBytesOutstanding(Constants.DEFAULT_BYTES_OUTSTANDING)
.build());
}

public static PslDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions options) {
Expand All @@ -75,10 +75,10 @@ public static PslDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions
}

Builder builder = builder();
Optional<String> value;
if ((value = options.get(Constants.CREDENTIALS_KEY_CONFIG_KEY)).isPresent()) {
builder.setCredentialsKey(value.get());
}
options.get(Constants.CREDENTIALS_KEY_CONFIG_KEY).ifPresent(builder::setCredentialsKey);
options
.get(Constants.MAX_MESSAGE_PER_BATCH_CONFIG_KEY)
.ifPresent(mmpb -> builder.setMaxMessagesPerBatch(Long.parseLong(mmpb)));
return builder
.setSubscriptionPath(
SubscriptionPath.parse(options.get(Constants.SUBSCRIPTION_CONFIG_KEY).get()))
Expand All @@ -103,7 +103,7 @@ public abstract static class Builder {

public abstract Builder setSubscriptionPath(SubscriptionPath subscriptionPath);

public abstract Builder setMaxBatchOffsetRange(long maxBatchOffsetRange);
public abstract Builder setMaxMessagesPerBatch(long maxMessagesPerBatch);

public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);

Expand Down
Expand Up @@ -64,7 +64,13 @@ public boolean next() {
msg = subscriber.messageIfAvailable();
break;
} catch (TimeoutException e) {
log.atWarning().log("Unable to get any messages in last " + SUBSCRIBER_PULL_TIMEOUT);
log.atWarning().log(
String.format(
"Unable to get any messages in last %s. Partition: %d; Current message offset: %d; End message offset: %d.",
SUBSCRIBER_PULL_TIMEOUT.toString(),
endOffset.partition().value(),
currentMsg != null ? currentMsg.offset().value() : null,
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved
endOffset.offset()));
} catch (Throwable t) {
throw new IllegalStateException("Failed to retrieve messages.", t);
}
Expand Down
Expand Up @@ -42,6 +42,7 @@ public class PslMicroBatchReader implements MicroBatchReader {
private final SubscriptionPath subscriptionPath;
private final FlowControlSettings flowControlSettings;
private final long topicPartitionCount;
private final long maxMessagesPerBatch;
@Nullable private SparkSourceOffset startOffset = null;
private SparkSourceOffset endOffset;

Expand All @@ -52,6 +53,7 @@ public PslMicroBatchReader(
PerTopicHeadOffsetReader headOffsetReader,
SubscriptionPath subscriptionPath,
FlowControlSettings flowControlSettings,
long maxMessagesPerBatch,
long topicPartitionCount) {
this.cursorClient = cursorClient;
this.committer = committer;
Expand All @@ -60,6 +62,7 @@ public PslMicroBatchReader(
this.subscriptionPath = subscriptionPath;
this.flowControlSettings = flowControlSettings;
this.topicPartitionCount = topicPartitionCount;
this.maxMessagesPerBatch = maxMessagesPerBatch;
}

@Override
Expand All @@ -79,7 +82,11 @@ public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
"end offset is not instance of SparkSourceOffset.");
endOffset = (SparkSourceOffset) end.get();
} else {
endOffset = PslSparkUtils.toSparkSourceOffset(headOffsetReader.getHeadOffset());
SparkSourceOffset headOffset =
PslSparkUtils.toSparkSourceOffset(headOffsetReader.getHeadOffset());
endOffset =
PslSparkUtils.getSparkEndOffset(
headOffset, startOffset, maxMessagesPerBatch, topicPartitionCount);
}
}

Expand Down
21 changes: 21 additions & 0 deletions src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.common.collect.ListMultimap;
import com.google.common.math.LongMath;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import java.util.ArrayList;
Expand Down Expand Up @@ -132,4 +133,24 @@ public static SparkSourceOffset getSparkStartOffset(
"Failed to get information from PSL and construct startOffset", e);
}
}

// EndOffset = min(startOffset + batchOffsetRange, headOffset)
public static SparkSourceOffset getSparkEndOffset(
SparkSourceOffset headOffset,
SparkSourceOffset startOffset,
long maxMessagesPerBatch,
long topicPartitionCount) {
Map<Partition, SparkPartitionOffset> map = new HashMap<>();
for (int i = 0; i < topicPartitionCount; i++) {
Partition p = Partition.of(i);
SparkPartitionOffset emptyPartition = SparkPartitionOffset.create(p, -1L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not for here, but can you check that this doesn't error out when the partition is actually empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do in real test.

long head = headOffset.getPartitionOffsetMap().getOrDefault(p, emptyPartition).offset();
long start = startOffset.getPartitionOffsetMap().getOrDefault(p, emptyPartition).offset();
map.put(
p,
SparkPartitionOffset.create(
p, Math.min(LongMath.saturatedAdd(start, maxMessagesPerBatch), head)));
}
return new SparkSourceOffset(map);
}
}
Expand Up @@ -40,6 +40,7 @@ public class PslMicroBatchReaderTest {
private final PartitionSubscriberFactory partitionSubscriberFactory =
mock(PartitionSubscriberFactory.class);
private final PerTopicHeadOffsetReader headOffsetReader = mock(PerTopicHeadOffsetReader.class);
private final long maxMessagesPerBatch = 20000;
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved
private final PslMicroBatchReader reader =
new PslMicroBatchReader(
cursorClient,
Expand All @@ -48,6 +49,7 @@ public class PslMicroBatchReaderTest {
headOffsetReader,
UnitTestExamples.exampleSubscriptionPath(),
OPTIONS.flowControlSettings(),
maxMessagesPerBatch,
2);

private PslSourceOffset createPslSourceOffsetTwoPartition(long offset0, long offset1) {
Expand Down Expand Up @@ -119,4 +121,22 @@ public void testPlanInputPartitionNoMessage() {
reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset));
assertThat(reader.planInputPartitions()).hasSize(1);
}

@Test
public void testMaxMessagesPerBatch() {
when(cursorClient.listPartitionCursors(UnitTestExamples.exampleSubscriptionPath()))
.thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(0L), Offset.of(100L))));
when(headOffsetReader.getHeadOffset())
.thenReturn(createPslSourceOffsetTwoPartition(10000000L, 0L));
reader.setOffsetRange(Optional.empty(), Optional.empty());
assertThat(((SparkSourceOffset) reader.getEndOffset()).getPartitionOffsetMap())
.containsExactly(
Partition.of(0L),
// the maxMessagesPerBatch setting takes effect as 100L + maxMessagesPerBatch is less
// than
// 10000000L.
SparkPartitionOffset.create(Partition.of(0L), 100L + maxMessagesPerBatch - 1L),
Partition.of(1L),
SparkPartitionOffset.create(Partition.of(1L), -1L));
}
}