Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add max message per batch option #14

Merged
merged 11 commits into from Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -25,7 +25,6 @@
public class Constants {
public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000;
public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE;
public static int DEFAULT_BATCH_OFFSET_RANGE = 100_000;
public static StructType DEFAULT_SCHEMA =
new StructType(
new StructField[] {
Expand All @@ -46,6 +45,8 @@ public class Constants {

public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK");

public static String MAX_MESSAGE_PER_BATCH_CONFIG_KEY =
"pubsublite.flowcontrol.maxmessageperbatch";
public static String BYTES_OUTSTANDING_CONFIG_KEY =
"pubsublite.flowcontrol.byteoutstandingperpartition";
public static String MESSAGES_OUTSTANDING_CONFIG_KEY =
Expand Down
Expand Up @@ -98,6 +98,7 @@ public MicroBatchReader createMicroBatchReader(
Ticker.systemTicker()),
subscriptionPath,
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
pslDataSourceOptions.maxMessagePerBatch(),
topicPartitionCount);
}
}
Expand Up @@ -58,15 +58,11 @@ public abstract class PslDataSourceOptions implements Serializable {
@Nullable
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved
public abstract FlowControlSettings flowControlSettings();

public abstract long maxBatchOffsetRange();
@Nullable
public abstract Long maxMessagePerBatch();
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved

public static Builder builder() {
return new AutoValue_PslDataSourceOptions.Builder()
.setCredentialsKey(null)
// TODO(jiangmichael): Revisit this later about if we need to expose this as a user
// configurable option. Ideally we should expose bytes range/# msgs range not
// offsets range since PSL doesn't guarantee offset = msg.
.setMaxBatchOffsetRange(Constants.DEFAULT_BATCH_OFFSET_RANGE);
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved
return new AutoValue_PslDataSourceOptions.Builder().setCredentialsKey(null);
}

public static PslDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions options) {
Expand All @@ -75,9 +71,13 @@ public static PslDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions
}

Builder builder = builder();
Optional<String> value;
if ((value = options.get(Constants.CREDENTIALS_KEY_CONFIG_KEY)).isPresent()) {
builder.setCredentialsKey(value.get());
Optional<String> cred;
if ((cred = options.get(Constants.CREDENTIALS_KEY_CONFIG_KEY)).isPresent()) {
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved
builder.setCredentialsKey(cred.get());
}
Optional<String> bor;
if ((bor = options.get(Constants.MAX_MESSAGE_PER_BATCH_CONFIG_KEY)).isPresent()) {
builder.setMaxMessagePerBatch(Long.parseLong(bor.get()));
}
return builder
.setSubscriptionPath(
Expand All @@ -103,7 +103,7 @@ public abstract static class Builder {

public abstract Builder setSubscriptionPath(SubscriptionPath subscriptionPath);

public abstract Builder setMaxBatchOffsetRange(long maxBatchOffsetRange);
public abstract Builder setMaxMessagePerBatch(long maxMessagePerBatch);

public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);

Expand Down
Expand Up @@ -64,7 +64,13 @@ public boolean next() {
msg = subscriber.messageIfAvailable();
break;
} catch (TimeoutException e) {
log.atWarning().log("Unable to get any messages in last " + SUBSCRIBER_PULL_TIMEOUT);
log.atWarning().log(
String.format(
"Unable to get any messages in last %s. Partition: %d; Current message offset: %d; End message offset: %d.",
SUBSCRIBER_PULL_TIMEOUT.toString(),
endOffset.partition().value(),
currentMsg != null ? currentMsg.offset().value() : null,
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved
endOffset.offset()));
} catch (Throwable t) {
throw new IllegalStateException("Failed to retrieve messages.", t);
}
Expand Down
Expand Up @@ -42,6 +42,7 @@ public class PslMicroBatchReader implements MicroBatchReader {
private final SubscriptionPath subscriptionPath;
private final FlowControlSettings flowControlSettings;
private final long topicPartitionCount;
@Nullable private final Long maxMessagePerBatch;
@Nullable private SparkSourceOffset startOffset = null;
private SparkSourceOffset endOffset;

Expand All @@ -52,6 +53,7 @@ public PslMicroBatchReader(
PerTopicHeadOffsetReader headOffsetReader,
SubscriptionPath subscriptionPath,
FlowControlSettings flowControlSettings,
@Nullable Long maxMessagePerBatch,
long topicPartitionCount) {
this.cursorClient = cursorClient;
this.committer = committer;
Expand All @@ -60,6 +62,7 @@ public PslMicroBatchReader(
this.subscriptionPath = subscriptionPath;
this.flowControlSettings = flowControlSettings;
this.topicPartitionCount = topicPartitionCount;
this.maxMessagePerBatch = maxMessagePerBatch;
}

@Override
Expand All @@ -79,7 +82,11 @@ public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
"end offset is not instance of SparkSourceOffset.");
endOffset = (SparkSourceOffset) end.get();
} else {
endOffset = PslSparkUtils.toSparkSourceOffset(headOffsetReader.getHeadOffset());
SparkSourceOffset headOffset =
PslSparkUtils.toSparkSourceOffset(headOffsetReader.getHeadOffset());
endOffset =
PslSparkUtils.getSparkEndOffset(
headOffset, startOffset, maxMessagePerBatch, topicPartitionCount);
}
}

Expand Down
22 changes: 22 additions & 0 deletions src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
import org.apache.spark.sql.catalyst.util.GenericArrayData;
Expand Down Expand Up @@ -132,4 +133,25 @@ public static SparkSourceOffset getSparkStartOffset(
"Failed to get information from PSL and construct startOffset", e);
}
}

// EndOffset = min(startOffset + batchOffsetRange, headOffset)
public static SparkSourceOffset getSparkEndOffset(
SparkSourceOffset headOffset,
SparkSourceOffset startOffset,
@Nullable Long maxMessagePerBatch,
jiangmichaellll marked this conversation as resolved.
Show resolved Hide resolved
long topicPartitionCount) {
Map<Partition, SparkPartitionOffset> map = new HashMap<>();
for (int i = 0; i < topicPartitionCount; i++) {
Partition p = Partition.of(i);
SparkPartitionOffset emptyPartition = SparkPartitionOffset.create(p, -1L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not for here, but can you check that this doesn't error out when the partition is actually empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do in real test.

long head = headOffset.getPartitionOffsetMap().getOrDefault(p, emptyPartition).offset();
long start = startOffset.getPartitionOffsetMap().getOrDefault(p, emptyPartition).offset();
if (maxMessagePerBatch != null) {
map.put(p, SparkPartitionOffset.create(p, Math.min(start + maxMessagePerBatch, head)));
} else {
map.put(p, SparkPartitionOffset.create(p, head));
}
}
return new SparkSourceOffset(map);
}
}
Expand Up @@ -40,6 +40,7 @@ public class PslMicroBatchReaderTest {
private final PartitionSubscriberFactory partitionSubscriberFactory =
mock(PartitionSubscriberFactory.class);
private final PerTopicHeadOffsetReader headOffsetReader = mock(PerTopicHeadOffsetReader.class);
private final long maxMessagePerBatch = 20000;
private final PslMicroBatchReader reader =
new PslMicroBatchReader(
cursorClient,
Expand All @@ -48,6 +49,7 @@ public class PslMicroBatchReaderTest {
headOffsetReader,
UnitTestExamples.exampleSubscriptionPath(),
OPTIONS.flowControlSettings(),
maxMessagePerBatch,
2);

private PslSourceOffset createPslSourceOffsetTwoPartition(long offset0, long offset1) {
Expand Down Expand Up @@ -119,4 +121,21 @@ public void testPlanInputPartitionNoMessage() {
reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset));
assertThat(reader.planInputPartitions()).hasSize(1);
}

@Test
public void testMaxMessagePerBatch() {
when(cursorClient.listPartitionCursors(UnitTestExamples.exampleSubscriptionPath()))
.thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(0L), Offset.of(100L))));
when(headOffsetReader.getHeadOffset())
.thenReturn(createPslSourceOffsetTwoPartition(10000000L, 0L));
reader.setOffsetRange(Optional.empty(), Optional.empty());
assertThat(((SparkSourceOffset) reader.getEndOffset()).getPartitionOffsetMap())
.containsExactly(
Partition.of(0L),
// the maxMessagePerBatch setting takes effect as 100L + maxMessagePerBatch is less than
// 10000000L.
SparkPartitionOffset.create(Partition.of(0L), 100L + maxMessagePerBatch - 1L),
Partition.of(1L),
SparkPartitionOffset.create(Partition.of(1L), -1L));
}
}