Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink tuning #14

Open
thodson-usgs opened this issue Jan 25, 2024 · 9 comments
Open

Flink tuning #14

thodson-usgs opened this issue Jan 25, 2024 · 9 comments

Comments

@thodson-usgs
Copy link
Contributor

thodson-usgs commented Jan 25, 2024

I think we need to tune Flink before this forge can really cook. Opening this issue to start the discussion.

Here are some initial ideas:

  1. Anytime I scale up I start hitting ImagePullBackOff errors. To avoid this, we could clone our own Flink image and have the workers pull from it. I need to investigate the extent this can be setup in Terraform. Helm seems a natural place to start: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/helm/
  2. I believe those ImagePullBackOffs are also causing jobs to fail with org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy . By default, I don't think Flink will handle any failures. Once we configure this, I think execution will be much more reliable.
  3. As the flink configuration becomes more complicated, it might make sense to keep a separate flink-config.yaml that's read in like yamlencode(file(fink-config.yaml)). Some of this config may be tuned per runner, which is a reason for separating it from the static Terraform.
  4. I'm by no means an expert, but before diving deeper into Flink, we might revisit whether it's the right technology or whether Spark is a better choice for batch work. @ranchodeluxe has put in a lot getting Flink moving but also has expressed some frustration. I'm still holding out hope for Flink, but I think we'll have a better sense after some tuning. (Apparently Spark has been investigated here Implement runner for apache spark  pangeo-forge-runner#133)
  5. As discussed below, we can currently enable task restarts by configuring flink via a recipe's config.py. But before we can enable job manager restarts (called High Availability in Flink), we'll need to add a shared filesystem where Flink can store job metadata.
@cisaacstern
Copy link
Member

This is exciting!

@thodson-usgs

This comment was marked as outdated.

@thodson-usgs

This comment was marked as outdated.

@thodson-usgs
Copy link
Contributor Author

thodson-usgs commented Jan 26, 2024

Recommended Flink configuration

Currently, this will configure Flink to handle task manager failures, but we will need to make additional changes to the deployment in order to handle job manager failures and avoid the ImagePullBackOffs failure mode that can occur as the number of nodes approaches the cluster's max limit. For now, using larger instances with more slots will help keep the cluster small and avoid ImagePullBackOffs.

Issues

Config

c.FlinkOperatorBakery.flink_configuration= {
    # https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
   "execution.runtime-mode": "BATCH",
   # recommend setting slots equal to the number of cores; t3.large=3
   "taskmanager.numberOfTaskSlots": "2", 

   # configure according to your instance; we assume >= t3.large
   # https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/
   "taskmanager.memory.flink.size": "1536m", 
   "taskmanager.memory.task.off-heap.size": "256m", 
   "taskmanager.memory.jvm-overhead.max": "1024m",

   # BROKEN job restart (HA)
   # https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/
   # broken because the cluster can't write to s3; need to create another store
   "high-availability.type": "kubernetes",
   "high-availability.storageDir": "s3://usgs-pforge-us-west-2-flink-cache/recovery",  # created manually
 
   # task restart
   # https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/
   "restart-strategy" : "fixed-delay",
   "restart-strategy.fixed-delay.attempts": "3",
   "restart-strategy.fixed-delay.delay": "10 s",
   
}

Tests

Next step will be to test job manager failures and HA mode.
Test 0: Kill task manager during job
🟢 : Other tasks continue, failed task restarts, job succeeds.

Test 1: Kill jobmanager at startup
🟢 : New job manger starts and spins up tasks managers.

Test 2: Kill jobmanager during job
🔴 : Job manager restarts, tasks continue, but eventually die. Flink dashboard has no record of job. The dashboard is aware of the zombie tasks, but doesn't associate them with its job.
💡 I believe this is failing because the cluster doesn't have permission to write to storageDir (refer to Flink's HA documentation).

Test 3 (edge case): Kill jobmanager just as tasks complete.
🟡

Test 4 (edge case): Simultaneously kill job and task manager during job.
🟡

If at least 0,1, and 2 pass, I believe we're safe to run on SPOT.

Props

  • @randodelux for the memory configuration.

Test Logs

Test 2 log:

2024-01-27 16:20:07,652 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Could not find main container flink-main-container in pod template, using empty one to initialize.
2024-01-27 16:20:08,255 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 5 pods from previous attempts, current attempt id is 2.
2024-01-27 16:20:08,260 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 5 workers from previous attempt.
2024-01-27 16:20:08,261 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-1 recovered from previous attempt.
2024-01-27 16:20:08,261 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-2 recovered from previous attempt.
2024-01-27 16:20:08,262 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-3 recovered from previous attempt.
2024-01-27 16:20:08,262 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-4 recovered from previous attempt.
2024-01-27 16:20:08,265 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-5 recovered from previous attempt.
2024-01-27 16:23:45,565 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Pending recovery taskmanagers 5 -> 4.
2024-01-27 16:23:45,566 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-4 is terminated. Diagnostics: Pod terminated, container termination statuses: [flink-main-container(exitCode=1, reason=Error, message=null)]
2024-01-27 16:23:45,855 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Pending recovery taskmanagers 4 -> 3.
2024-01-27 16:23:45,856 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-3 is terminated. Diagnostics: Pod terminated, container termination statuses: [flink-main-container(exitCode=1, reason=Error, message=null)]
2024-01-27 16:23:45,861 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Pending recovery taskmanagers 3 -> 2.
2024-01-27 16:23:45,861 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-2 is terminated. Diagnostics: Pod terminated, container termination statuses: [flink-main-container(exitCode=1, reason=Error, message=null)]
2024-01-27 16:25:08,280 WARN  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-1 did not register in PT5M, will stop it and request a new one if needed.
2024-01-27 16:25:08,281 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Stopping worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-1.
2024-01-27 16:25:08,281 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Stopping TaskManager pod gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-1.
2024-01-27 16:25:08,281 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Pending recovery taskmanagers 2 -> 1.
2024-01-27 16:25:08,287 WARN  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-5 did not register in PT5M, will stop it and request a new one if needed.
2024-01-27 16:25:08,287 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Stopping worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-5.
2024-01-27 16:25:08,287 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Stopping TaskManager pod gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-5.
2024-01-27 16:25:08,287 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Pending recovery taskmanagers 1 -> 0. Resource manager is ready to serve.

@ranchodeluxe
Copy link
Collaborator

ranchodeluxe commented Jan 29, 2024

Nice of you to look into all this stuff (especially the HA things) @thodson-usgs 🥳 Thanks for doing this.

Thinking through some edge cases here.

I often think of HA as being most useful in streaming workflows b/c if something bad goes wrong the job can recover and magically pick up from where the last checkpoint was. But in batch (let's assume you're using SPOT instances for this) I'm not sure how you'd differentiate between a job that failed b/c there's a valid bug and a job that failed b/c the SPOT instance was terminated. In the former scenario wouldn't the job just infinitely restart and fail?

Anytime I scale up I start hitting ImagePullBackOff errors. To avoid this, we could clone our own Flink image and have the workers pull from it.

I haven't run into this yet b/c the rate limit is decent. Is your parallelism >= 100 workers already? AFAIK each EC2 node should be caching images for all workers on it. So this would be a hard limit to breach unless maybe you're spinning up a worker per EC2 node and there are hundreds of them (though I could be wrong about aspects here). I'm wondering if something else is going on with your network possibly? We should look into this

@thodson-usgs
Copy link
Contributor Author

thodson-usgs commented Jan 29, 2024

Good point! I won't know until I test this. I think a valid bug would cause a task failure, which is handled by a separate strategy, like retry 3 times. Hopefully, the job won't restart when the task restart stategy fails. (In my testing, a failing task doesn't cause the job manager to fail, so task failures shouldn't cause the job to rerun 🤞 )

As for ImagePullBackOffs, I would typically get several setting parallelism = 40, but that's using t3.large with one slot. I've had fewer no issues using two slots (half as many image pulls), but eventually someone will run a big job and bump into this again.

@ranchodeluxe
Copy link
Collaborator

I think a valid bug would cause a task failure, which is handled by a separate strategy, like retry 3 times

yummy, like that

that's using t3.large with one slot. I've had fewer no issues using two slots (half as many image pulls), but eventually someone will run a big job and bump into this again

yeah, let's look at this b/c we can probably tune it to work differently but I agree that certain patterns will violate it eventually

@thodson-usgs
Copy link
Contributor Author

thodson-usgs commented Jan 30, 2024

ah, might you be refering to the adaptive scheduler

jobmanager.scheduler: adaptive

Looks like jobmanager.scheduler: default uses an adaptive-batch scheduler. However, adaptive-batch will only manage the parallelism if it is not set (so our config.py is disabling it), whereas adaptive will throttle your parallelism if resources aren't available.

@ranchodeluxe
Copy link
Collaborator

ranchodeluxe commented Feb 1, 2024

ah, might you be refering to the adaptive scheduler

Interesting, I've never seen that ☝️.

I'm talking about how the kubelet and container runtime work in k8s by default as described here: https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy

I don't think any of the Flink images injected into the manifests use :lastest tag. if they do then the default would be imagePullPolicy: Always and that might cause your behaviour. But my assumption is we're getting imagePullPolicy: ifNotPresent for all images in each deploy so we should check.

Assuming we have a decent sized node, all the above means is during scheduling of many pods per node only the first pod would need to request the image and all the other worker pods would be returned the cached image from the container runtime (per node). This of course assumes they are using the same image tag too.

In your case, when 40 workers get scheduled that means we should only really be hitting Docker Hub once per node for as many nodes as your workers get scheduled on. So that would be a good thing to concretize and get numbers on. Then there is is the upper limit to node autoscaling to think about. It's probably something reasonable like 10. So it's an interesting problem that you seem to be running into

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants