New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Major classes for Spark continuous streaming #396
feat: Major classes for Spark continuous streaming #396
Conversation
...k-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitter.java
Outdated
Show resolved
Hide resolved
...k-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitter.java
Outdated
Show resolved
Hide resolved
...k-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitter.java
Outdated
Show resolved
Hide resolved
...k-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitter.java
Outdated
Show resolved
Hide resolved
...k-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/MultiPartitionCommitter.java
Outdated
Show resolved
Hide resolved
...spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java
Show resolved
Hide resolved
private final MultiPartitionCommitter committer; | ||
private PslSourceOffset startOffset; | ||
|
||
public PslContinuousReader(PslDataSourceOptions options) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't belong here, this is wiring code. Remove this constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to have this so I could keep code in PslDataSource super simple.
AdminServiceClient adminServiceClient, | ||
CursorServiceClient cursorServiceClient, | ||
MultiPartitionCommitter committer) { | ||
this.options = options; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't take the options, take the things you need from it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is needed for lower level (PslContinuousInputPartition) for many params in the option(subPath, flowctrl, creds), prefer not to disassemble it early.
assert PslSourceOffset.class.isAssignableFrom(start.get().getClass()) | ||
: "start offset is not assignable to PslSourceOffset."; | ||
startOffset = (PslSourceOffset) start.get(); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do these methods need to be thread safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, they are called serially by the order of deserializeoffset (if writeahead log has any from previous query), setStartOffset, getStartOffset, planInputPartitions... get things done... mergeOffsets, commit.
...spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java
Show resolved
Hide resolved
There is an extra issue in this PR. The buffering pull subscriber will lead to an unbounded message cache inside, thus not respecting the flow control. Please ignore this for this PR. I have another PR to address and make it bounded. EDIT: #408 is the PR to make it bounded but let's finish this before I assign that. |
@AutoValue.Builder | ||
public abstract static class Builder { | ||
|
||
public abstract Builder credentialsKey(String credentialsKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. these methods should be called "set"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rebase this, fix the few comments, and then ping me back for approval
…continuous-processing
Codecov Report
@@ Coverage Diff @@
## master #396 +/- ##
============================================
- Coverage 72.10% 71.63% -0.47%
- Complexity 845 871 +26
============================================
Files 158 163 +5
Lines 4399 4573 +174
Branches 222 226 +4
============================================
+ Hits 3172 3276 +104
- Misses 1104 1168 +64
- Partials 123 129 +6
Continue to review full report at Codecov.
|
This implements major classes for spark continuous streaming.