Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KubernetesPodOperator with GCS FUSE sidecar sometimes stays running indefinitely #39625

Open
2 tasks done
aptenodytes-forsteri opened this issue May 15, 2024 · 2 comments
Open
2 tasks done
Labels
area:providers kind:bug This is a clearly a bug provider:cncf-kubernetes Kubernetes provider related issues

Comments

@aptenodytes-forsteri
Copy link

aptenodytes-forsteri commented May 15, 2024

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

No response

Apache Airflow version

airflow-2.7.3

Operating System

linux

Deployment

Google Cloud Composer

Deployment details

No response

What happened

I created a dag with a KubernetesPodOperator which uses the annotations to mount GCS buckets using the Google Cloud Storage FUSE Container Storage Interface (CSI) Plugin.

Due to what I believe to be another bug, GoogleCloudPlatform/gcs-fuse-csi-driver#257, the GCS FUSE sidecar would sometimes stay running.

From log messages, I determined the operator was stuck in an infinite loop here:

self.log.info("Pod %s has phase %s", pod.metadata.name, remote_pod.status.phase)

This appears to be a somewhat known issue, as the above function seems to have a special case for the istio sidecar. The same treatment should hold for all sidecars.

What you think should happen instead

It should not be possible for a "rogue" sidecar container to cause the KubernetesPodOperator to end up in an infinite loop. This behavior ends up hogging resources on a cluster and eventually clogs up the whole cluster with zombie pods.

One possible fix would be an optional timeout for how long to wait for the pod to do its work.

Another possible fix would be to generalize the treatment for the istio sidecar for all types of sidecars.

How to reproduce

  1. Set up a Google cloud composer environment
  2. Create a dag that mounts two buckets ("in" and "out" using the GCS FUSE csi driver)
import json
import time
from datetime import datetime, timedelta
from functools import cached_property

from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.utils.pod_manager import (PodManager, PodPhase,
                                                                container_is_completed)
from airflow.utils.dates import days_ago
from postgres import Postgres

from kubernetes import client
from kubernetes.client import models as k8s
from kubernetes.client.models.v1_pod import V1Pod



# Volume mounts for the 'in' and 'out' GCS bucket.
VOLUMES = [
    client.V1Volume(
        name="gcs-fuse-in",
        csi=client.V1CSIVolumeSource(
            driver="gcsfuse.csi.storage.gke.io",
            volume_attributes={
                "bucketName": "in-data",
                "mountOptions": "implicit-dirs",
                "fileCacheCapacity": "0"
            },
        ),
    )
]
VOLUME_MOUNTS = [client.V1VolumeMount(name="gcs-fuse-in", mount_path="/data")]

out_bucket = "out-data"
if out_bucket != "":
    VOLUMES.append(
        client.V1Volume(
            name="gcs-fuse-out", csi=client.V1CSIVolumeSource(
                driver="gcsfuse.csi.storage.gke.io", volume_attributes={
                    "bucketName": out_bucket,
                    "mountOptions": "implicit-dirs",
                    "fileCacheCapacity": "0"
                })))

    VOLUME_MOUNTS.append(
        client.V1VolumeMount(name="gcs-fuse-out", mount_path="/out"))


@dag(
    dag_id="example_dag",
    description="Attempts to reproduce hanging pod issue.",
    default_args={
        'owner': 'cloud_engineering',
        'email_on_failure': False, 
        'retries': 1,
        'retry_delay': timedelta(seconds=10),
    },
    start_date=START_DATE,
    end_date=END_DATE,
    schedule_interval=timedelta(seconds=30),
    catchup=True,
)
def dag():
    log_ids = fetch_new_log_ids()

    container_entry_point_commands = generate_entry_point_commands(log_ids)

    KubernetesPodOperator.partial(
        name="process_log_example_dag",
        namespace="composer-user-workloads",
        config_file="/home/airflow/composer_kube_config",
       kubernetes_conn_id="kubernetes_default",
        task_id="process_log",
        image="us-central1-docker.pkg.dev/project/images/example-image",
        get_logs=True,
        log_events_on_failure=True,
        do_xcom_push=False,
        volumes=VOLUMES,
        volume_mounts=VOLUME_MOUNTS,
        # GCS Fuse CSI driver relies on pod annotations to configure itself and the container sidecar it runs in.
        annotations={
            "gke-gcsfuse/volumes": "true",
            "gke-gcsfuse/ephemeral-storage-limit": "1Gi",
            "gke-gcsfuse/cpu-request": "500m",
            "gke-gcsfuse/memory-request": "1Gi",
            "gke-gcsfuse/ephemeral-storage-request": "1Gi",
        },
        container_resources=k8s.V1ResourceRequirements(
            limits={
                'memory': "3Gi",
                'cpu': "1",
                'ephemeral-storage': "0Gi"
            },
            requests={
                'memory': "3Gi",
                'cpu': "1",
                'ephemeral-storage': "0Gi"
            },
        ),
    ).expand(cmds=container_entry_point_commands)


dag()

The container runs a simple python script that writes 100 files to /out

def main():
  for i in range(0,100):
    with open(f"/out/{i}.txt", "w", encoding="utf8") as f:
      f.write(f"{i}")
  raise Exception("oops")

if __name__ == "__main__":
  main()

Anything else

I can work around the issue with:

class MyPodManager(PodManager):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def await_pod_completion(self, pod: V1Pod, istio_enabled: bool = False,
                             container_name: str = "base") -> V1Pod:
         while True and time.time() - start < 7200:
            remote_pod = self.read_pod(pod)
            if remote_pod.status.phase in PodPhase.terminal_states:
                break
            if istio_enabled and container_is_completed(remote_pod, container_name):
                break
            if container_is_completed(remote_pod, container_name):
                # Always break if the container is completed, even if sidecar containers continue to stay up
                # and keep the pod up.
                self.log.info("Pod %s should terminate now", pod.metadata.name)
                break
            self.log.info("Pod %s has phase %s", pod.metadata.name, remote_pod.status.phase)
            time.sleep(2)
        return remote_pod

class MyKubernetesPodOperator(KubernetesPodOperator):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    @cached_property
    def pod_manager(self) -> PodManager:
        return MyPodManager(kube_client=self.client, callbacks=self.callbacks,                 progress_callback=self._progress_callback)

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@aptenodytes-forsteri aptenodytes-forsteri added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels May 15, 2024
Copy link

boring-cyborg bot commented May 15, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@ryaminal
Copy link

Thanks for sharing this @aptenodytes-forsteri. I was asking on the google issuetracker about how to do this to simulate what is available on a GCP Composer cluster e.g. /home/airflow/gcs/data. Someone on that issue mentioned they got it to work. Then I did. And, as I was going about looking to see if I should post a PR/issue here I stumbled upon this issue.

It's not the exact same but I imagine more and more folks want to do this and will want to mount multiple buckets/volumes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers kind:bug This is a clearly a bug provider:cncf-kubernetes Kubernetes provider related issues
Projects
None yet
Development

No branches or pull requests

3 participants