Skip to content

Commit

Permalink
Merge pull request #20 from optimizely/sc-states
Browse files Browse the repository at this point in the history
Better handling of job state
  • Loading branch information
stathischaritos committed Jul 5, 2022
2 parents a9a84bd + d291fdd commit 3398bec
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 56 deletions.
19 changes: 12 additions & 7 deletions kubeluigi/__init__.py
Expand Up @@ -91,10 +91,11 @@ def build_job_definition(self) -> V1Job:
)
return job

def onpodstarted(self, pod):
logger.info(
f"Tail the Pod logs using: kubectl logs -f -n {pod.namespace} {pod.name}"
)
def onpodstarted(self, pods):
for pod in pods:
logger.info(
f"Tail the Pod logs using: kubectl logs -f -n {pod.namespace} {pod.name}"
)

def as_yaml(self):
job = self.build_job_definition()
Expand All @@ -110,15 +111,19 @@ def run(self):
run_and_track_job(self.kubernetes_client, job, self.onpodstarted)
except FailedJob as e:
logger.exception(
f"Luigi's job has failed running: {e.job.metadata.name}, {e.pod.status.message}"
f"Luigi's job has failed running: {e.job.metadata.name}"
)
for pod in e.pods:
logger.exception(
f"Luigi's job has failed running: {pod.status.message}"
)
raise
except Exception:
logger.exception(f"Luigi has failed to run: {job}, starting cleaning")
raise
finally:
else:
clean_job_resources(self.kubernetes_client, job)

def output(self):
"""
An output target is necessary for checking job completion unless
Expand Down
67 changes: 53 additions & 14 deletions kubeluigi/k8s.py
Expand Up @@ -29,9 +29,9 @@


class FailedJob(Exception):
def __init__(self, job, pod, message):
def __init__(self, job, pods, message):
self.job = job
self.pod = pod
self.pods = pods
self.message = message
super().__init__(self.message)

Expand Down Expand Up @@ -160,15 +160,54 @@ def get_job_pods(job) -> List[V1Pod]:
).items


def reduce_job_state(pods: List[V1Pod]):
pod_states = []
error_message = "Unknown Reason of Failure"
job_state = "Mixed"

for pod in pods:

pod_state = pod.status.phase

# Boil down all container states into one pod state.
for status in pod.status.container_statuses:
if status.state.waiting and status.state.waiting.reason == "InvalidImageName":
pod_state = "Failed"
error_message = "Invalid Image"

if status.state.terminated and status.state.terminated.reason == 'Error':
pod_state = "Failed"

pod_states.append(pod_state)

# Boil down all pod states into one job state.

# If all states are the same, set that as the job state
if len(set(pod_states)) == 1:
job_state = pod_states[0]

# If one is Failed, then the job is Failed
if "Failed" in pod_states:
job_state = "Failed"

return job_state, error_message


def job_phase_stream(job):
previous_phase = {}
previous_job_state = None

while True:

sleep(DEFAULT_POLL_INTERVAL)
pods = get_job_pods(job)
for pod in pods:
if previous_phase.get(pod.metadata.name, None) != pod.status.phase:
yield pod.status.phase, pod
previous_phase[pod.metadata.name] = pod.status.phase
job_state, error_message = reduce_job_state(pods)

# Only yield job state changes
if job_state != previous_job_state:
yield job_state, pods, error_message

# Update state tracker
previous_job_state = job_state


def are_all_pods_successful(job):
Expand All @@ -184,18 +223,18 @@ def run_and_track_job(
"""
logger.debug(f"Submitting job: {job.metadata.name}")
job = kick_off_job(k8s_client, job)
for phase, pod in job_phase_stream(job):
logger.debug(f"Task {job.metadata.name} state is {phase}")
for state, pods, error_message in job_phase_stream(job):
logger.debug(f"Task {job.metadata.name} state is {state}")

# ToDo: Check if we handle : Scale up but not enough resources
# Warning: Running onpodstarted is not guaranteed to execute all times..
if phase == "Running":
onpodstarted(pod)
if state == "Running":
onpodstarted(pods)

if phase == "Failed":
raise FailedJob(job, pod, "Failed pod in Job")
if state == "Failed":
raise FailedJob(job, pods, error_message)

if phase == "Succeeded" and are_all_pods_successful(job):
if state == "Succeeded" and are_all_pods_successful(job):
return


Expand Down
33 changes: 29 additions & 4 deletions test/kubernetes_helpers_test.py
Expand Up @@ -10,6 +10,7 @@
get_container_with_volume_mounts,
attach_volume_to_spec,
job_phase_stream,
reduce_job_state
)

from kubernetes.client import V1Pod, V1PodCondition
Expand Down Expand Up @@ -143,15 +144,15 @@ def test_job_phase_stream(mocked_get_job_pods):
pod.status = MagicMock()
pod.status.phase = "Running"
mocked_get_job_pods.return_value = [pod]
phase, pod_with_changed_state = next(job_phase_stream(job))
phase, pods_with_changed_state, error_message = next(job_phase_stream(job))
assert phase == "Running"
assert pod.metadata.name == pod_with_changed_state.metadata.name
assert pod.metadata.name == pods_with_changed_state[0].metadata.name

pod.status.phase = "Succeeded"
mocked_get_job_pods.return_value = [pod]
phase, pod_with_changed_state = next(job_phase_stream(job))
phase, pods_with_changed_state, error_message = next(job_phase_stream(job))
assert phase == "Succeeded"
assert pod.metadata.name == pod_with_changed_state.metadata.name
assert pod.metadata.name == pods_with_changed_state[0].metadata.name


@patch("kubeluigi.k8s.get_job_pods")
Expand Down Expand Up @@ -199,3 +200,27 @@ def test_kick_off_job():
client.create_namespaced_job.assert_called_with(
body=job, namespace=job.metadata.namespace
)


@patch("kubeluigi.k8s.get_job_pods")
def test_reduce_job_state(mocked_get_job_pods):
labels = {"l1": "label1"}

pod1 = pod_spec_from_dict("name_of_pod", dummy_pod_spec, labels=labels)
pod1.status = MagicMock()
pod1.status.phase = "Running"

pod2 = pod_spec_from_dict("name_of_pod", dummy_pod_spec, labels=labels)
pod2.status = MagicMock()
pod2.status.phase = "Failed"

job_state, error_message = reduce_job_state([pod1, pod2])
assert job_state == "Failed"

pod2.status.phase = "Pending"
job_state, error_message = reduce_job_state([pod1, pod2])
assert job_state == "Mixed"

pod2.status.phase = "Running"
job_state, error_message = reduce_job_state([pod1, pod2])
assert job_state == "Running"
31 changes: 0 additions & 31 deletions test/test_kubernetes_job_task.py
Expand Up @@ -63,37 +63,6 @@ def test_job_definition_as_yaml():
assert yaml_as_dict["spec"]["template"]["spec"]["volumes"] == []


@patch("kubeluigi.run_and_track_job")
@patch("kubeluigi.clean_job_resources")
@patch.object(KubernetesJobTask, "build_job_definition")
@patch.object(KubernetesJobTask, "_init_kubernetes")
def test_failing_task_clean_resources(
mocked_init_kubernetes,
mocked_build_job_definition,
mocked_clean_job_resources,
mocked_run_and_track_job,
):
"""
testing k8s resources are cleaned when running job fails.
"""
task = DummyTask()
task._init_task_metadata()
task.kubernetes_client = MagicMock()
task._KubernetesJobTask__logger = MagicMock()

class DummyException(Exception):
pass

e = DummyException()
mocked_run_and_track_job.side_effect = e
with pytest.raises(DummyException):
task.run()

mocked_build_job_definition.assert_called_once()
mocked_clean_job_resources.assert_called_once()
mocked_clean_job_resources.assert_called_once()


def test_name_not_implemented():
task = KubernetesJobTask()

Expand Down

0 comments on commit 3398bec

Please sign in to comment.