Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48330][SS][PYTHON] Fix the python streaming data source timeout issue for large trigger interval #46651

Closed
wants to merge 8 commits into from

Conversation

chaoqin-li1123
Copy link
Contributor

What changes were proposed in this pull request?

Fix the python streaming data source timeout issue for large trigger interval
For python streaming source, keep the long running worker archaetecture but set the socket timeout to be infinity to avoid timeout error.
For python streaming sink, since StreamingWrite is also created per microbatch in scala side, long running worker cannot be attached to s StreamingWrite instance. Therefore we abandon the long running worker architecture, simply call commit() or abort() and exit the worker and allow spark to reuse worker for us.

Why are the changes needed?

Currently we run long running python worker process for python streaming source and sink to perform planning, commit and abort in driver side. Testing indicate that current implementation cause connection timeout error when streaming query has large trigger interval.

Does this PR introduce any user-facing change?

No

How was this patch tested?

add integration test

Was this patch authored or co-authored using generative AI tooling?

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fine but cc @HeartSaVioR

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks OK - I don't follow the change of using PythonPlannerRunner but @HyukjinKwon reviewed this so I defer to him. Left a few comments for better testing.

Btw, looks like we leave source runner to keep it as long live one but sink runner as shorter one. Is it for simplicity? I understand they are different, just wanted to know it is thoughtfully decided.

.trigger(ProcessingTimeTrigger(20 * 1000))
.start(outputDir.getAbsolutePath)
eventually(timeout(waitTimeout * 5)) {
inputData.addData(1 to 3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we intentionally add 1 to 3 into source every 15 ms (default)? You can just call addData a few time before eventually as MemoryStream would produce the data for single call as a single microbatch. (2 times = 2 batches)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A batch in memory stream actually doesn't correspond to a microbatch(

), but I figure out a way to do something similar.

.start(outputDir.getAbsolutePath)
eventually(timeout(waitTimeout * 5)) {
inputData.addData(1 to 3)
assert(q.lastProgress.batchId >= 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have control over the input data once you apply my suggestion, why not check the output as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change applied.

writer.abort(commit_messages, batch_id) # type: ignore[arg-type]
else:
writer.commit(commit_messages, batch_id) # type: ignore[arg-type]
# Send a status code back to JVM.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@HeartSaVioR
Copy link
Contributor

For python streaming sink, since StreamingWrite is also created per microbatch in scala side, long running worker cannot be attached to s StreamingWrite instance. Therefore we abandon the long running worker architecture, simply call commit() or abort() and exit the worker and allow spark to reuse worker for us.

Ah OK, it's unable to be reused anyway. Then makes sense.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending CI

Main method for committing or aborting a data source streaming write operation.

This process is invoked from the `PythonStreamingSinkCommitRunner.runInPython`
method in the StreamingWrite implementation of the PythonTableProvider. It is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have a PythonTableProvider. Do you mean PythonTable or PythonDataSourceV2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -39,78 +35,22 @@ import org.apache.spark.sql.types.StructType
* from the socket, then commit or abort a microbatch.
*/
class PythonStreamingSinkCommitRunner(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change, is the PyhtonStreamingSinkCommitRunner the same as the batch one now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is similar except that streaming commit runner also takes the batch id as parameter and throw a different type of exception.

@@ -210,7 +210,8 @@ def main(infile: IO, outfile: IO) -> None:
# Read information about how to connect back to the JVM from the environment.
java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"]
(sock_file, _) = local_connect_and_auth(java_port, auth_secret)
(sock_file, sock) = local_connect_and_auth(java_port, auth_secret)
sock.settimeout(None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we add a short comment here on why we need to set this timeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment added.

@HeartSaVioR
Copy link
Contributor

Let's do post-review if there are remaining comments. Looks like the change is right and unavoidable.

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants