Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Compute head offset for Spark connector micro batch mode. #439

Merged
merged 9 commits into from Jan 12, 2021

Conversation

jiangmichaellll
Copy link
Contributor

@jiangmichaellll jiangmichaellll commented Jan 5, 2021

This PR does 2 things:

  1. Fixed a bug in micro batch mode where the endoffset should be set inside the reader instead of passed in as a fixed one.
  2. Uses a rate limited headoffset reader that ensures each topic refresh is at most once per min.

@jiangmichaellll jiangmichaellll requested a review from a team as a code owner January 5, 2021 20:11
@product-auto-label product-auto-label bot added the api: pubsublite Issues related to the googleapis/java-pubsublite API. label Jan 5, 2021
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Jan 5, 2021
@codecov
Copy link

codecov bot commented Jan 5, 2021

Codecov Report

Merging #439 (fa3e803) into master (2099751) will increase coverage by 0.08%.
The diff coverage is 42.10%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master     #439      +/-   ##
============================================
+ Coverage     71.08%   71.16%   +0.08%     
- Complexity      914      916       +2     
============================================
  Files           167      168       +1     
  Lines          4831     4855      +24     
  Branches        243      246       +3     
============================================
+ Hits           3434     3455      +21     
- Misses         1257     1259       +2     
- Partials        140      141       +1     
Impacted Files Coverage Δ Complexity Δ
...le/cloud/pubsublite/internal/TopicStatsClient.java 0.00% <ø> (ø) 0.00 <0.00> (ø)
...loud/pubsublite/internal/TopicStatsClientImpl.java 66.66% <0.00%> (-33.34%) 3.00 <0.00> (ø)
...m/google/cloud/pubsublite/spark/PslDataSource.java 0.00% <0.00%> (ø) 0.00 <0.00> (ø)
...e/cloud/pubsublite/spark/PslDataSourceOptions.java 8.77% <0.00%> (-2.34%) 2.00 <0.00> (ø)
...oud/pubsublite/spark/LimitingHeadOffsetReader.java 64.70% <64.70%> (ø) 2.00 <2.00> (?)
...le/cloud/pubsublite/spark/PslMicroBatchReader.java 70.45% <100.00%> (-7.05%) 12.00 <0.00> (ø)
...ud/pubsublite/internal/wire/SubscriberBuilder.java 40.74% <0.00%> (-7.09%) 2.00% <0.00%> (ø%)
...oud/pubsublite/internal/wire/PublisherBuilder.java 62.26% <0.00%> (-6.49%) 3.00% <0.00%> (ø%)
... and 6 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a4ca069...5756e2e. Read the comment docs.

@jiangmichaellll jiangmichaellll requested a review from a team as a code owner January 6, 2021 20:33
Copy link
Collaborator

@dpcollins-google dpcollins-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM following comments

return cachedHeadOffsets.get(CACHE_KEY);
Map<Partition, Offset> partitionOffsetMap = new HashMap<>();
for (int i = 0; i < topicPartitionCount; i++) {
partitionOffsetMap.put(Partition.of(i), cachedHeadOffsets.get(Partition.of(i)));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You still need to do this asynchronously. The fanout means all of these lookups will be serialized. Use AsyncLoadingCache.java and Futures.allAsList instead to do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used https://github.com/ben-manes/caffeine, as the guava lib was soft deprecated (https://b.corp.google.com/issues/171496465, and many other places). I need to pull in some future conversion as well (similar to https://source.corp.google.com/search?q=net.javacrumbs.futureconverter.java8guava.FutureConverter.toCompletableFuture).

@jiangmichaellll jiangmichaellll merged commit f484754 into master Jan 12, 2021
@jiangmichaellll jiangmichaellll deleted the jiangmichael-spark-headoffset-reader branch January 12, 2021 00:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsublite Issues related to the googleapis/java-pubsublite API. cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants