Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update with external checkpoints #199

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

davidbirdsong
Copy link

@davidbirdsong davidbirdsong commented May 4, 2020

A draft to implement new functionality proposed in #197.

Adding in values to the CRD to support conveying that savepoint during upgrade can be performed with a checkpoint instead.

Checkpoints are running constantly in the background and so a 'recent enough' checkpoint must exist and that recency configurable.

@@ -58,6 +59,7 @@ type FlinkApplicationSpec struct {
AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"`
ForceRollback bool `json:"forceRollback"`
MaxCheckpointRestoreAgeSeconds *int32 `json:"maxCheckpointRestoreAgeSeconds,omitempty"`
MaxCheckpointDeployAgeSeconds *int32 `json:"maxCheckpointDeployAgeSeconds,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is MaxCheckpointDeployAgeSeconds functionally different from the MaxCheckpointRestoreAgeSeconds?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not functionally, no. my thinking was that these checkpoint age values could have very different acceptability thresholds since the MaxCheckpointDeployAgeSeconds would be looking for an ideal, possibly short window. I may have read the code wrong, but my thinking was by failing this age check, the operator would effectively poll the JM API looking for the next checkpoint to run and complete (we have ours checkpointing frequently.)

I may have read the state machine handler incorrectly though.

@glaksh100
Copy link
Contributor

Hi @davidbirdsong ! Thank you for creating this PR.

I have one overall comment/suggestion on the approach. Including the functionality you have in this PR, we have 3 ways to perform an update today: WithSavepoint, Without Savepoint, With Checkpoint. I'm wondering if we can consolidate these options into one field (maybe something called UpdateMode etc)?.

We will still have to support the existing flags for backward compatibility. Thoughts?
cc. @mwylde

@davidbirdsong
Copy link
Author

davidbirdsong commented May 11, 2020

Hi @davidbirdsong ! Thank you for creating this PR.

I have one overall comment/suggestion on the approach. Including the functionality you have in this PR, we have 3 ways to perform an update today: WithSavepoint, Without Savepoint, With Checkpoint. I'm wondering if we can consolidate these options into one field (maybe something called UpdateMode etc)?.

We will still have to support the existing flags for backward compatibility. Thoughts?
cc. @mwylde

I figured additive was much easier WRT to backwards compatibility since savepoints are part of job cancellation while checkpoints are not and so the supporting behavior savepoints and checkpoints would be very different.

I'm open to unifying the functionality/implementation, but I'd want some guidance/collaboration since expanding features into existing whilst maintaining backwards compatibility is trickier. I'd rather not guess at what's palatable to the core devs.

savepointJobSuccessCounter: labeled.NewCounter("savepoint_job_success", "Savepoint job request succeeded", flinkJmClientScope),
savepointJobFailureCounter: labeled.NewCounter("savepoint_job_failed", "Savepoint job request failed", flinkJmClientScope),
getCheckpointsConfigSuccessCounter: labeled.NewCounter("get_checkpoints_config_success", "Get checkpoint config request succeeded", flinkJmClientScope),
getCheckpointsConfigFailureCounter: labeled.NewCounter("get_checkpoints_config_failed", "Get checkpoint config request failed", flinkJmClientScope),
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love go fmt, but it can also be disruptive causing PR's to bloat changes and hides the actual diff.

I could add some line breaks to separate out the new, longer struct member and minimize the line change count on things like this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Not a problem, ok to leave as is :)

@davidbirdsong davidbirdsong marked this pull request as ready for review May 14, 2020 16:28
@glaksh100
Copy link
Contributor

Sorry for the delay here @davidbirdsong ! Thanks for addressing the review comments. I am leaning on the approach where we introduce an UpdateMode, which can default to the savepoint and explicitly use either of the other two approaches (checkpoint or savepointDisabled). The only flag we'd need to retain for backward compatibility would be savepointDisabled, which IMO is okay. Let me know what you think!

cc/ @anandswaminathan @mwylde

@davidbirdsong
Copy link
Author

Sorry for the delay here @davidbirdsong ! Thanks for addressing the review comments. I am leaning on the approach where we introduce an UpdateMode, which can default to the savepoint and explicitly use either of the other two approaches (checkpoint or savepointDisabled). The only flag we'd need to retain for backward compatibility would be savepointDisabled, which IMO is okay. Let me know what you think!

cc/ @anandswaminathan @mwylde

Can you advise on how to handle the coupling of needing both of the following to be true:

  • externalized checkpoints enabled
  • RETAIN_ON_CANCELLATION set on the job env itself

without these, any checkpoint-based update will fail. For this PR, I kept it simple by returning from handleApplicationSavepointingWithCheckpoint with the state machine unchanged, but I'm not clear on what good behavior should be. The requirement for the setting and the setting itself are far apart: operator vs app and could lead to a deadlock of sorts. What do you recommend?

UpdateMode will allow for 3 differen actions taken before old application is cancelled:
- savepoint
- checkpoint
- no state saved (taking over existing SavepointDisabled flag)
@glaksh100
Copy link
Contributor

Hey @davidbirdsong ! Thanks for the updates. I will review this PR in the next couple of days :)

savepointpath can be supplied or found dynamically, helps with debugpath to observe which one is used
Copy link
Contributor

@glaksh100 glaksh100 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @davidbirdsong ! I took a stab at an initial review and left a few comments. Once we work through some design discussions on the PR, it would also be nice to have an integration test that tests this feature.

// Blue Green deployment and no savepoint required implies, we directly transition to submitting job
s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob)
} else {
s.updateApplicationPhase(application, v1beta1.FlinkApplicationSavepointing)
}
return statusChanged, nil
}
func (s *FlinkStateMachine) handleApplicationSavepointingWithCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's scope to abstract out some common code between this method and handleApplicationRecovering()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also minor suggestion: Could we rename this method to not include the term Savepoint(since it's not really using a savepoint)?

savepointJobSuccessCounter: labeled.NewCounter("savepoint_job_success", "Savepoint job request succeeded", flinkJmClientScope),
savepointJobFailureCounter: labeled.NewCounter("savepoint_job_failed", "Savepoint job request failed", flinkJmClientScope),
getCheckpointsConfigSuccessCounter: labeled.NewCounter("get_checkpoints_config_success", "Get checkpoint config request succeeded", flinkJmClientScope),
getCheckpointsConfigFailureCounter: labeled.NewCounter("get_checkpoints_config_failed", "Get checkpoint config request failed", flinkJmClientScope),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Not a problem, ok to leave as is :)

}

func (f *Controller) FindExternalizedCheckpointForSavepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) {
checkpointConfig, err := f.flinkClient.GetCheckpointConfig(ctx, f.getURLFromApp(application, hash), application.Status.JobStatus.JobID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there opportunity to combine FindExternalizedCheckpoint and FindExternalizedCheckpointForSavepoint into a single method? I believe that the checkpoint configuration check (to ensure it's RETAIN_ON_CANCELLATION) is applicable to both cases.

@@ -342,6 +362,11 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a
s.updateApplicationPhase(application, v1beta1.FlinkApplicationRecovering)
return statusChanged, nil
}
// use of checkpoints in the place of savepoints
if application.Spec.UpdateMode == v1beta1.UpdateModeCheckpoint {
return s.handleApplicationSavepointingWithCheckpoint(ctx, application)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would make sense to make the restoration from a checkpoint an explicit state in the state machine?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, my gut was that it doesn't warrant it, but I suppose that since there's a max-age and a window where the controller is polling to find the next, fresher checkpoint that the controller might do well to expose that state and have a clearer exit condition. I can prototype it, if you'd like.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree that it would be clearer to expose it as a state. Happy to help review the prototype!

@davidbirdsong
Copy link
Author

davidbirdsong commented Jul 17, 2020

Another thing that occurred to me: how a non-empty savepointPath and updateMode interact. My initial thinking is that a non-empy savepointPath takes precedence over updateMode. Since upateMode mostly influences the steps taken prior to cancelling a job and transitioning to SubmittingJob. Also, the way I interpret it's reason for existence is for an operator to force a new job to start from a known position: it affords the operator a hard-edged control.

My thinking is that when savepointPath is non-empty, all codepaths concerned with fetching a recent checkpoint or to cancel with savepoint can move directly to SubmittingJob. How does that sound?

@glaksh100
Copy link
Contributor

That's a very good point! I don't believe we'd want the updateMode and savepointPath to interact at all in fact, because the expectation today is that the user-provided savepointPath is only used during the first deploy of the application (see: https://github.com/lyft/flinkk8soperator/blob/master/pkg/controller/flinkapplication/flink_state_machine.go#L578)

In the first deploy of a job, we use the provided savepoint (if any) while submitting the job. During an update, however the field is not used. So I think we should be okay with keeping these states/transitions isolated. Let me know what you think.

Another thought that came to mind is that, since v0.5 we have a BlueGreen dpeloyment mode that allows a new job to be submitted (without cancelling the old job). During the update in the mode, we take a savepoint of the old job (w/o cancellation). We should ensure that the BlueGreen deployment mode behavior is preserved in all updateModes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants