-
Notifications
You must be signed in to change notification settings - Fork 819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for CRDs for the Karmada Descheduler #4905
Comments
I just ran a quick example as per Flink Operater QuickStart, a simple FlinkDeployment looks like(you can get it from here): apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-example
spec:
image: flink:1.17
flinkVersion: v1_17
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: stateless Also, dumped the configurations here: apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
creationTimestamp: "2024-05-07T08:59:32Z"
finalizers:
- flinkdeployments.flink.apache.org/finalizer
generation: 2
name: basic-example
namespace: default
resourceVersion: "30397"
uid: fa8d217e-9859-4ccd-ba39-f4670aa6f3f5
spec:
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
flinkVersion: v1_17
image: flink:1.17
job:
args: []
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
state: running
upgradeMode: stateless
jobManager:
replicas: 1
resource:
cpu: 1
memory: 2048m
serviceAccount: flink
taskManager:
resource:
cpu: 1
memory: 2048m
status:
clusterInfo:
flink-revision: c0027e5 @ 2023-11-09T13:24:38+01:00
flink-version: 1.17.2
total-cpu: "2.0"
total-memory: "4294967296"
jobManagerDeploymentStatus: READY
jobStatus:
checkpointInfo:
lastPeriodicCheckpointTimestamp: 0
jobId: fb11661e5eebb5c2aea39ab0405f9b85
jobName: State machine job
savepointInfo:
lastPeriodicSavepointTimestamp: 0
savepointHistory: []
startTime: "1715073594831"
state: RUNNING
updateTime: "1715073620122"
lifecycleState: STABLE
observedGeneration: 2
reconciliationStatus:
lastReconciledSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
lastStableSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
reconciliationTimestamp: 1715072375768
state: DEPLOYED
taskManager:
labelSelector: component=taskmanager,app=basic-example
replicas: 1 I guess your use case might be:
You want a feature that Karmada can schedule the |
Thanks for the quick response @RainbowMango! Yes, the general use-case is as you've described -
We want to be able to:
For point 2, it becomes a little tricky as we would like to differentiate between application-level errors that result in pod failures (which Karmada shouldn't reschedule) vs. pod scheduling errors. Ideally the FlinkDeployment's status could reflect these differences so that Karmada can intepret the FlinkDeployment's health status accurately - will need to discuss further. |
For the active-active configuration, I guess you mean deploy 2 FlinkDeployments on 2 clusters, that result in two Flink clusters. I'm not familiar with Flink, does that mean the two Flink clusters are collectively consuming a single data stream?
The cluster-level failover is based on taints, you can set the tolerations to controller how long a deployment should be wait before cluster failure.
Yes, It would be great if we can observe the status(failover or not), from the FlinkDeployment's status? Can we do that? |
Yes we would deploy 2 identical FlinkDeployments, both deployed with identical data stream sources that would feed the applications the same data. In an active-passive configuration, the setup would be similar except for only one of the data sources providing data to the active application.
Thanks! Yes we are actively tuning these parameters. :)
We used a custom health interpreter to determine the FlinkDeployment's health - which seems to be working. Will confirm if this is fully suited for our use-cases after we finish testing. |
What would you like to be added:
During our testing we noticed that descheduler will filter out all resource types that are not deployments. Because of this, we would like to propose adding descheduler support for CRDs (custom resources).
Why is this needed:
We aim to use Karmada specifically for it's failover and deschedule capabilities, for the FlinkDeployment CRD. Looking at previous issues, there seems to be interest this in type of support, and we believe a generic solution could benefit the community. We wanted to create this ticket to start a conversation and get some feedback / opinions on this type of support.
The text was updated successfully, but these errors were encountered: