Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35157][runtime] Sources with watermark alignment get stuck once some subtasks finish #24757

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

elon-X
Copy link
Contributor

@elon-X elon-X commented May 6, 2024

What is the purpose of the change

Sources with watermark alignment get stuck once some subtasks finish, this PR solves this problem.

Brief change log

while some subtasks have been finished, the SourceOperator send Long.MAX_VALUE to SourceCoordinator, and SourceCoordinator checks whether subtasks have been finished before sending the event.

Verifying this change

This change added tests and can be verified as follows:

  • org.apache.flink.streaming.api.operators.SourceOperatorAlignmentTest::testWatermarkAlignmentWhileSubtaskFinished()
  • org.apache.flink.runtime.source.coordinator.SourceCoordinatorAlignmentTest::testWatermarkAlignmentWhileSubtaskFinished()

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no

@flinkbot
Copy link
Collaborator

flinkbot commented May 6, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@elon-X
Copy link
Contributor Author

elon-X commented May 7, 2024

hi, @1996fanrui would you mind reviewing this for me when you have a moment? Thank you very much!

@1996fanrui 1996fanrui self-requested a review May 10, 2024 08:10
@1996fanrui 1996fanrui self-assigned this May 10, 2024
Copy link
Member

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @elon-X , I'm wondering if "NoMoreSplitsEvent" would be appropriate to check if a subtask is finished.

The Split can be discovered dynamically. IIRC, kafka source subtask won't be FINISHED even if it doesn't have split for Flink Streaming Job. (This subtask may be assigned split after adding new kafka partition)

I prefer Gyula proposed solution: The solution could be to send out a max watermark event once the sources finish or to exclude them from the source coordinator.

We need to find a proper code place(source is definitely finished) to send MaxTimestamp.

I guess

may be a suitable place.

context.sendEventToSourceOperatorIfTaskReady(
subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark));
// when subtask have been finished, do not send event.
if (!context.hasNoMoreSplits(subtaskId)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does NoMoreSplits mean subtask has been finished?

I'm afraid they are not same. For example, some subtask doesn't have split when the parallelism of kafka source is greater than kafka partition.

I'm not sure do we need this change. IIUC, inside of context.sendEventToSourceOperatorIfTaskReady has a check : do send event when gateway is not null.

It means coordinator doesn't send event to subtask when subtask is finished, right?

Please correct me if anything is wrong, thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @1996fanrui ,
First of all, thank you for your detailed review and response. I would like to share my thoughts:

1.Regarding the SourceOperator, initially, I considered the issue of code positioning. I noticed that before switching to the DataInputStatus.END_OF_INPUT state, the SourceOperator receives a NoMoreSplitsEvent, and the SourceCoordinator also needs to determine whether the subtask has finished or not. Therefore, I chose to send the maximum timestamp within the if (event instanceof NoMoreSplitsEvent) { } I believe these two locations should be equivalent.

2.On the SourceCoordinator side, if a task has finished, the context.sendEventToSourceOperatorIfTaskReady code still sends events to that subtask. In the code provided by Gyula, if the response result from gateway.sendEvent(event).get() is obtained, the following exception occurs:
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.operators.coordination.TaskNotRunningException: "Source: Sequence Source -> Filter -> Sink: Print to Std. Out (1/2)" is not running, but in state FINISHED

Therefore, I added the condition !context.hasNoMoreSplits(subtaskId). I also tested the scenario "some subtask doesn't have split when the parallelism of Kafka source is greater than Kafka partition," and indeed, no NoMoreSplitsEvent is sent.

Please correct me if my understanding is wrong. I will modify the code based on the final discussion results and submit it~

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe these two locations should be equivalent.

I think you are right, both of DataInputStatus.END_OF_INPUT and NoMoreSplitsEvent work well for now.

I prefer DataInputStatus.END_OF_INPUT because its semantics are clearer, and it's closer to FINISHED. It means the task will be definitely switched to FINISHED after DataInputStatus.END_OF_INPUT.

As we all know, code is often refactored. We shouldn't make assumptions (especially assumptions that other developers don't know about). Other developers may break our assumption if they don't know during the refactor in the future. I'm afraid the NoMoreSplitsEvent is used for other cases, and task doesn't switch to FINISHED after receiving NoMoreSplitsEvent in the future. That's my concern.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, would you mind adding the demo from FLINK-35157 as an ITCase to check whether the watermark alignment works well with finished task?

This ITCase will fail or timeout(as described by FLINK-35157: source is blocked) if another developer breaks this feature accidentally. WDYT?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants