Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[@parallel on Kubernetes] support for Jobsets #1804

Merged
merged 1 commit into from May 20, 2024

Conversation

valayDave
Copy link
Collaborator

@valayDave valayDave commented Apr 18, 2024

Implementation originates from [#1744] but now supersedes the PR.

This commit adds support for @parallel when flows are run --with kubernetes Support for Argo workflows will follow in a separate commit.

A user can run a flow with the following:

@step
def start(self):
    self.next(self.parallel_step, num_parallel=3)

@kubernetes(cpu=1, memory=512)
@parallel
@step
def parallel_step(self):
...

Comment on lines +452 to +482
num_parallel = None
if hasattr(flow, "_parallel_ubf_iter"):
num_parallel = flow._parallel_ubf_iter.num_parallel

if num_parallel and num_parallel >= 1 and ubf_context == UBF_CONTROL:
control_task_id, worker_task_ids = TaskIdConstructor.join_step_task_ids(
num_parallel
)
mapper_task_ids = [control_task_id] + worker_task_ids
flow._control_mapper_tasks = [
"%s/%s/%s" % (run_id, step_name, mapper_task_id)
for mapper_task_id in mapper_task_ids
]
flow._control_task_is_mapper_zero = True

if num_parallel and num_parallel > 1:
_setup_multinode_environment()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed so that Join steps has all the relevant task-ids.

Comment on lines +64 to +65
def create_job_spec(self):
client = self._client.get()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has to create a sub-function called create_job_spec so we could reuse the jobspec created for K8s jobs and plug that in directly into Jobsets.

metaflow/plugins/kubernetes/kubernetes_job.py Outdated Show resolved Hide resolved
metaflow/plugins/kubernetes/kubernetes_job.py Outdated Show resolved Hide resolved
return overall_status, control_exit_code, control_pod_failed


def _construct_jobset_logical_status(jobset, control_pod=None):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main function which helps compute the logical status of the jobset.

)


class RunningJobSet(object):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interface similar to RunningJob so that the runtime process can monitor the jobset.

).jobset_failed


class TaskIdConstructor:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This helps constructing all the task-ids from one place

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent! Thanks for this implementation. Otherwise, it could get hard figuring out how/where the task ids get constructed.

)


class KubernetesJobSet(object):
Copy link
Collaborator Author

@valayDave valayDave May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It creates one control and one worker definition. The workers will have replicas set to num_parallel-1. All workers and control will leverage the "jobspec" created by the KubernetesJob interface.

metaflow/plugins/kubernetes/kubernetes_jobsets.py Outdated Show resolved Hide resolved
@@ -4,7 +4,7 @@

from metaflow.exception import MetaflowException

from .kubernetes_job import KubernetesJob
from .kubernetes_job import KubernetesJob, KubernetesJobSet
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need this import for Kubernetes clients which maybe getting used via extensions.

)


def _basic_validation_for_js(jobset):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

metaflow/plugins/kubernetes/kubernetes_job.py Outdated Show resolved Hide resolved
control_pod_status=None,
worker_pods_failed=False,
control_pod_failed=False,
some_jobs_are_running=False,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some_jobs_are_running helps derive if something is running or not.

Comment on lines +648 to +677
self._group = KUBERNETES_JOBSET_GROUP
self._version = KUBERNETES_JOBSET_VERSION
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this configurable so that we can have code paths that deal with different GROUP /VERSIONS based on what they are set in the config.

num_parallel=num_parallel,
namespace=namespace,
)
worker_task_id = TaskIdConstructor.jobset_worker_id(task_id)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single place where we construct the Task-id

@valayDave valayDave force-pushed the valay/oss-jobsets branch 2 times, most recently from 1bc4c17 to 8ca8cd3 Compare May 13, 2024 18:18
Comment on lines +186 to +191
if num_parallel is not None and num_parallel <= 1:
raise KubernetesException(
"Using @parallel with `num_parallel` <= 1 is not supported with Kubernetes. "
"Please set the value of `num_parallel` to be greater than 1."
)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

constraint added because jobset doesn't play nice with replicas = 0; Once kubernetes-sigs/jobset allow this, we can lift this constraint and add version logic to verify if the jobset can be submitted or not. Currently not supported with Jobset CRD version jobset.x-k8s.io/v1alpha2

Implementation originates from [Netflix#1744]

This commit adds support for @parallel when flows are run `--with kubernetes`
Support for Argo workflows will follow in a separate commit.

A user can run a flow with the following:

    @step
    def start(self):
        self.next(self.parallel_step, num_parallel=3)

    @kubernetes(cpu=1, memory=512)
    @parallel
    @step
    def parallel_step(self):
    ...

Some notes about the implementation:

- No annotations for task-id in pods since We cannot dynamically construct the task-id during K8s container runtime.
- @catch is currently not supported with @parallel on kubernetes
- metadata about jobset name exists in the task-metadata
- The jobset will contain two job definitions; One for control and one for worker.
- The worker will have n-1 replicas created.
- We construct the worker task-id determininstically using naming conventions and shell hacking.
- Jobset is considered running even if one job amongst all of them are running.
- @Retry will work with jobset
- num_parallel <=1 will NOT be supported to start with;
    - One core reason is that jobsets don't allow setting replicas to 0;
    - jobsets controller will mutate a jobset with replica set to 0 with replicas set to 1.
- The implementation accounts for Jobset CRD schema from v0.2.0
    - Jobset team changed the schema (just renaming values) after v0.3.0
    - The changes were to `replicatedJobsStatus` where certain fields were added and `ReplicatedJobsStatus` was renamed to `replicatedJobsStatus`
Comment on lines +130 to +139
def _retrieve_replicated_job_statuses(jobset):
# We needed this abstraction because Jobsets changed thier schema
# in version v0.3.0 where `ReplicatedJobsStatus` became `replicatedJobsStatus`
# So to handle users having an older version of jobsets, we need to account
# for both the schemas.
if jobset.get("status", {}).get("replicatedJobsStatus", None):
return jobset.get("status").get("replicatedJobsStatus")
elif jobset.get("status", {}).get("ReplicatedJobsStatus", None):
return jobset.get("status").get("ReplicatedJobsStatus")
return None
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to add a conditional block to swallow the complexity for the user when the jobset controller may be of a version < 0.3.0. This is because jobset's schema had changed from v0.3.0. No logical changes to the schema, only additions and renaming of values like ReplicatedJobsStatus to replicatedJobsStatus

Comment on lines +202 to +205
if "suspended" in job_status:
# `replicatedJobStatus` didn't have `suspend` field
# until v0.3.0. So we need to account for that.
workers_are_suspended = job_status["suspended"] > 0
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suspend was also not a supported keyword in Jobset for version v0.2.0; This is why we need a conditional check over here.

@savingoyal savingoyal merged commit 1a38256 into Netflix:master May 20, 2024
25 of 26 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants