New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Supports topic partition increase. #115
Conversation
Codecov Report
@@ Coverage Diff @@
## master #115 +/- ##
============================================
- Coverage 59.46% 58.96% -0.51%
- Complexity 82 92 +10
============================================
Files 17 18 +1
Lines 528 580 +52
Branches 18 24 +6
============================================
+ Hits 314 342 +28
- Misses 210 232 +22
- Partials 4 6 +2
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this MJ! Mostly looks good, a few comments
src/main/java/com/google/cloud/pubsublite/spark/CachedPartitionCountReader.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImpl.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImpl.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java
Show resolved
Hide resolved
src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitterImpl.java
Outdated
Show resolved
Hide resolved
Hi Tianzi, can you help review the clirr-ignored-differences.xml change? Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but I'm not sure how to use it.
This adds support for topic partition increase for both micro batch and continuous mode.
CachedPartitionCountReader
is used to cache the number of topic partitions and fetches once every 10s, that should be well within the limit (admin read limit is 600/min). Spark doesn't need a consistent read for it to work as long as it's eventually consistent.For micro batch, the
CachedPartitionCountReader
is embedded inside HeadOffsetReader, and inside the lifecycle of each batch, as soon as the topic partition is read, this will serve as the topic partition across the whole lifecycle of this batch. It's implicitly embedded in theendOffset
.For continuous, a topic partition number is set once a ContinuousReader, and once
needsReconfiguration()
detects an updated value, Spark will reconstruct a new ContinuousReader with the updated value.