Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add max message per batch option #14

Merged
merged 11 commits into from Jan 29, 2021

Conversation

jiangmichaellll
Copy link
Contributor

@jiangmichaellll jiangmichaellll commented Jan 21, 2021

With only PSL flow control, it's not enough to stop Spark executors from OOMing when the batch is large (large backlog). This adds support for hard limiting the max message per batch.

Note that startOffset + batch_offset_range might land on a non-message offset. Under the assumptions that headoffset - 1 is a message offset (the only corner case is broker ungracefully shut down and no new message to unblock), startOffset + batch_offset_range will be unblocked even if it is not a non-message offset.

Staging tested that adding this new option it won't OOM anymore.

EDIT: actually kafka has same parameter https://screenshot.googleplex.com/57nu34SUfMvcRr5.

@jiangmichaellll jiangmichaellll requested a review from a team as a code owner January 21, 2021 23:02
@google-cla google-cla bot added the api: pubsublite Issues related to the googleapis/java-pubsublite-spark API. label Jan 21, 2021
@product-auto-label product-auto-label bot added the api: pubsublite Issues related to the googleapis/java-pubsublite-spark API. label Jan 21, 2021
gitp
@jiangmichaellll jiangmichaellll changed the title feat: Add batch offset range control feat: Add max message per batch option Jan 26, 2021
@codecov
Copy link

codecov bot commented Jan 26, 2021

Codecov Report

Merging #14 (baf5693) into master (bcbd25e) will increase coverage by 1.02%.
The diff coverage is 67.64%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master      #14      +/-   ##
============================================
+ Coverage     57.25%   58.28%   +1.02%     
- Complexity       79       81       +2     
============================================
  Files            17       17              
  Lines           503      525      +22     
  Branches         16       18       +2     
============================================
+ Hits            288      306      +18     
- Misses          212      216       +4     
  Partials          3        3              
Impacted Files Coverage Δ Complexity Δ
...m/google/cloud/pubsublite/spark/PslDataSource.java 0.00% <0.00%> (ø) 0.00 <0.00> (ø)
...blite/spark/PslMicroBatchInputPartitionReader.java 58.97% <0.00%> (-8.68%) 7.00 <0.00> (ø)
...e/cloud/pubsublite/spark/PslDataSourceOptions.java 11.36% <60.00%> (+5.19%) 2.00 <0.00> (ø)
...a/com/google/cloud/pubsublite/spark/Constants.java 95.23% <100.00%> (+0.23%) 1.00 <0.00> (ø)
...le/cloud/pubsublite/spark/PslMicroBatchReader.java 89.58% <100.00%> (+0.94%) 11.00 <0.00> (ø)
...m/google/cloud/pubsublite/spark/PslSparkUtils.java 95.71% <100.00%> (+0.71%) 14.00 <2.00> (+2.00)
...ogle/cloud/pubsublite/spark/SparkSourceOffset.java 100.00% <0.00%> (+8.51%) 16.00% <0.00%> (ø%)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update bcbd25e...baf5693. Read the comment docs.

Map<Partition, SparkPartitionOffset> map = new HashMap<>();
for (int i = 0; i < topicPartitionCount; i++) {
Partition p = Partition.of(i);
SparkPartitionOffset emptyPartition = SparkPartitionOffset.create(p, -1L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not for here, but can you check that this doesn't error out when the partition is actually empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do in real test.

Copy link
Contributor

@dpcollins-google dpcollins-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved. Consider addressing nits before submitting

@jiangmichaellll jiangmichaellll merged commit e9c640f into master Jan 29, 2021
@jiangmichaellll jiangmichaellll deleted the jiangmichael-batch-range branch January 29, 2021 01:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsublite Issues related to the googleapis/java-pubsublite-spark API. cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants