New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Compute head offset for Spark connector micro batch mode. #439
Conversation
Codecov Report
@@ Coverage Diff @@
## master #439 +/- ##
============================================
+ Coverage 71.08% 71.16% +0.08%
- Complexity 914 916 +2
============================================
Files 167 168 +1
Lines 4831 4855 +24
Branches 243 246 +3
============================================
+ Hits 3434 3455 +21
- Misses 1257 1259 +2
- Partials 140 141 +1
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM following comments
...le-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/TopicStatsClient.java
Outdated
Show resolved
Hide resolved
...-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java
Outdated
Show resolved
Hide resolved
...-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java
Outdated
Show resolved
Hide resolved
...-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java
Outdated
Show resolved
Hide resolved
...-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java
Outdated
Show resolved
Hide resolved
...-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PerTopicHeadOffsetReader.java
Outdated
Show resolved
Hide resolved
...park-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java
Outdated
Show resolved
Hide resolved
return cachedHeadOffsets.get(CACHE_KEY); | ||
Map<Partition, Offset> partitionOffsetMap = new HashMap<>(); | ||
for (int i = 0; i < topicPartitionCount; i++) { | ||
partitionOffsetMap.put(Partition.of(i), cachedHeadOffsets.get(Partition.of(i))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You still need to do this asynchronously. The fanout means all of these lookups will be serialized. Use AsyncLoadingCache.java and Futures.allAsList instead to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used https://github.com/ben-manes/caffeine, as the guava lib was soft deprecated (https://b.corp.google.com/issues/171496465, and many other places). I need to pull in some future conversion as well (similar to https://source.corp.google.com/search?q=net.javacrumbs.futureconverter.java8guava.FutureConverter.toCompletableFuture).
This PR does 2 things: