Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling exception getting logs when pods finish success #39296

Merged
merged 12 commits into from
May 28, 2024
17 changes: 12 additions & 5 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from deprecated import deprecated
from kubernetes.client import CoreV1Api, V1Pod, models as k8s
from kubernetes.stream import stream
from kubernetes.client.exceptions import ApiException
from urllib3.exceptions import HTTPError

from airflow.configuration import conf
Expand Down Expand Up @@ -766,9 +767,15 @@ def _clean(self, event: dict[str, Any]):
# Skip await_pod_completion when the event is 'timeout' due to the pod can hang
# on the ErrImagePull or ContainerCreating step and it will never complete
if event["status"] != "timeout":
self.pod = self.pod_manager.await_pod_completion(
self.pod, istio_enabled, self.base_container_name
)
try:
self.pod = self.pod_manager.await_pod_completion(
self.pod, istio_enabled, self.base_container_name
)
except ApiException as e:
if e.status == 404:
self.pod = None
amoghrajesh marked this conversation as resolved.
Show resolved Hide resolved
else:
raise e
if self.pod is not None:
self.post_complete_action(
pod=self.pod,
Expand Down Expand Up @@ -796,11 +803,11 @@ def write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time: DateTime
line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n")
if line:
self.log.info("Container logs: %s", line)
except HTTPError as e:
except (HTTPError, ApiException) as e:
self.log.warning(
"Reading of logs interrupted with error %r; will retry. "
"Set log level to DEBUG for traceback.",
e,
e if not isinstance(e, ApiException) else e.reason,
)

def post_complete_action(self, *, pod, remote_pod, **kwargs):
Expand Down
24 changes: 24 additions & 0 deletions tests/providers/cncf/kubernetes/operators/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import pendulum
import pytest
from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext, V1PodStatus, models as k8s
from kubernetes.client.exceptions import ApiException
from urllib3 import HTTPResponse

from airflow.exceptions import AirflowException, AirflowSkipException, TaskDeferred
Expand Down Expand Up @@ -1998,6 +1999,29 @@ def test_async_write_logs_should_execute_successfully(
else:
mock_manager.return_value.read_pod_logs.assert_not_called()

@patch(KUB_OP_PATH.format("post_complete_action"))
@patch(KUB_OP_PATH.format("extract_xcom"))
@patch(HOOK_CLASS)
@patch(KUB_OP_PATH.format("pod_manager"))
def test_async_write_logs_handler_api_exception(
self, mock_manager, mocked_hook, mock_extract_xcom
):
mock_manager.read_pod_logs.return_value = ApiException(status=404)
potiuk marked this conversation as resolved.
Show resolved Hide resolved
mocked_hook.return_value.get_pod.return_value = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAMESPACE)
)
mock_extract_xcom.return_value = "{}"
k = KubernetesPodOperator(
task_id="task",
get_logs=True,
deferrable=True,
)
self.run_pod_async(k)
mock_k = k
mock_k.log.warning = MagicMock(return_value='test')

mock_k.log.warning.assert_called_once()

@pytest.mark.parametrize(
"log_pod_spec_on_failure,expect_match",
[
Expand Down