Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow DAG Task Exits with Return Code 1 #39601

Open
1 of 2 tasks
dineshkumar20 opened this issue May 14, 2024 · 4 comments
Open
1 of 2 tasks

Airflow DAG Task Exits with Return Code 1 #39601

dineshkumar20 opened this issue May 14, 2024 · 4 comments
Labels
area:core Can't Reproduce The problem cannot be reproduced kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet pending-response stale Stale PRs per the .github/workflows/stale.yml policy file

Comments

@dineshkumar20
Copy link

dineshkumar20 commented May 14, 2024

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.4

What happened?

I'm encountering an issue while running a task in my Airflow DAG. The task utilizes a pex file to import code, and although the Python callable executes without errors, the task exits with return code 1. I've also noticed exceptions related to logging in the worker pod logs.

#test.py

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator

def whisper_task():
    try:
        print("workflow started")
        pex_path = '/opt/airflow/pex/whisper_app.pex'
        import sys
        import os
        from pex.pex_bootstrapper import bootstrap_pex_env
        sys.path.append(pex_path)
        bootstrap_pex_env(pex_path)
        print("workflow completed")
    except Exception as e:
        print("Exception Occurred in workflow", e)
        raise e

dag = DAG('test-dag', description='DAG for Test IO Type')
start_task = EmptyOperator(task_id='start', queue='cpu',dag=dag)
whisper_task = PythonOperator(task_id='whisper_task', python_callable=whisper_task, queue='gpu', dag=dag)
end_task = EmptyOperator(task_id='end', queue='cpu', dag=dag)

# Define the task dependencies
start_task >> whisper_task >> end_task

Logs from task

[2024-05-12, 18:38:42 JST] {logging_mixin.py:188} INFO - Preprocessing workflow completed
[2024-05-12, 18:38:42 JST] {python.py:202} INFO - Done. Returned value was: None
[2024-05-12, 18:38:44 JST] {configuration.py:2014} INFO - Reading the config from /opt/airflow/airflow.cfg
[2024-05-12, 18:38:44 JST] {settings.py:60} INFO - Configured default timezone UTC
[2024-05-12, 18:38:44 JST] {settings.py:541} INFO - Loaded airflow_local_settings from /opt/airflow/config/airflow_local_settings.py .
[2024-05-12, 18:38:44 JST] {logging_config.py:54} DEBUG - Unable to load custom logging, using default config instead
[2024-05-12, 18:38:44 JST] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-05-12, 18:38:44 JST] {dagrun.py:876} DEBUG - number of tis tasks for <DagRun asris-zoom-dag @ 2024-05-12 09:37:44.802824+00:00: manual__2024-05-12T09:37:44.802824+00:00, state:running, queued_at: 2024-05-12 09:37:44.859310+00:00. externally triggered: True>: 2 task(s)
[2024-05-12, 18:38:44 JST] {dagrun.py:897} DEBUG - number of scheduleable tasks for <DagRun asris-zoom-dag @ 2024-05-12 09:37:44.802824+00:00: manual__2024-05-12T09:37:44.802824+00:00, state:running, queued_at: 2024-05-12 09:37:44.859310+00:00. externally triggered: True>: 1 task(s)
[2024-05-12, 18:38:44 JST] {taskinstance.py:1988} DEBUG - <TaskInstance: asris-zoom-dag.end manual__2024-05-12T09:37:44.802824+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=0, skipped=0, failed=0, upstream_failed=0, removed=0, done=0, success_setup=0, skipped_setup=0), upstream_task_ids={'whisper_task'}
[2024-05-12, 18:38:44 JST] {taskinstance.py:1969} DEBUG - Dependencies not met for <TaskInstance: asris-zoom-dag.end manual__2024-05-12T09:37:44.802824+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=0, skipped=0, failed=0, upstream_failed=0, removed=0, done=0, success_setup=0, skipped_setup=0), upstream_task_ids={'whisper_task'}
[2024-05-12, 18:38:44 JST] {taskinstance.py:1988} DEBUG - <TaskInstance: asris-zoom-dag.end manual__2024-05-12T09:37:44.802824+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2024-05-12, 18:38:44 JST] {taskinstance.py:1988} DEBUG - <TaskInstance: asris-zoom-dag.end manual__2024-05-12T09:37:44.802824+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2024-05-12, 18:38:44 JST] {taskinstance.py:3312} INFO - 0 downstream tasks scheduled from follow-on schedule check

Logs from Worker pod

[2024-05-12 09:38:44,530: ERROR/ForkPoolWorker-7] [d78edabb-2643-4615-b93a-54a510d850fa] Failed to execute task.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2360, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode, session=session)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2532, in _execute_task_with_callbacks
    result = self._execute_task(context, task_orig)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2549, in _execute_task
    return _execute_task(self, context, task_orig)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 452, in _execute_task
    _record_task_map_for_downstreams(
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 917, in _record_task_map_for_downstreams
    if next(task.iter_mapped_dependants(), None) is None:  # No mapped dependants, no need to validate.
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 356, in <genexpr>
    return (
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 323, in _iter_all_mapped_downstreams
    from airflow.models.mappedoperator import MappedOperator
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/__init__.py", line 68, in <module>
    settings.initialize()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/settings.py", line 559, in initialize
    configure_orm()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/settings.py", line 249, in configure_orm
    log.debug("Setting up DB connection pool (PID %s)", os.getpid())
  File "/usr/local/lib/python3.9/logging/__init__.py", line 1434, in debug
    self._log(DEBUG, msg, args, **kwargs)
  File "/usr/local/lib/python3.9/logging/__init__.py", line 1589, in _log
    self.handle(record)
  File "/usr/local/lib/python3.9/logging/__init__.py", line 1599, in handle
    self.callHandlers(record)
  File "/usr/local/lib/python3.9/logging/__init__.py", line 1661, in callHandlers
    hdlr.handle(record)
  File "/usr/local/lib/python3.9/logging/__init__.py", line 952, in handle
    self.emit(record)
  File "/usr/local/lib/python3.9/logging/__init__.py", line 1086, in emit
    stream.write(msg + self.terminator)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 200, in write
    self.flush()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 207, in flush
    self._propagate_log(buf)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 188, in _propagate_log
    self.logger.log(self.level, remove_escape_codes(message))

What you think should happen instead?

Task should complete without any errors

How to reproduce

Create a pex file and import the code from the pex like this

pex_path = '/opt/airflow/pex/processor.pex'
import sys
import os
from pex.pex_bootstrapper import bootstrap_pex_env
sys.path.append(pex_path)
bootstrap_pex_env(pex_path)

Operating System

Debian GNU/Linux 12

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.19.0
apache-airflow-providers-celery==3.6.1
apache-airflow-providers-cncf-kubernetes==8.0.1
apache-airflow-providers-common-io==1.3.0
apache-airflow-providers-common-sql==1.11.1
apache-airflow-providers-docker==3.9.2
apache-airflow-providers-elasticsearch==5.3.3
apache-airflow-providers-ftp==3.7.0
apache-airflow-providers-google==10.16.0
apache-airflow-providers-grpc==3.4.1
apache-airflow-providers-hashicorp==3.6.4
apache-airflow-providers-http==4.10.0
apache-airflow-providers-imap==3.5.0
apache-airflow-providers-microsoft-azure==9.0.1
apache-airflow-providers-mysql==5.5.4
apache-airflow-providers-odbc==4.4.1
apache-airflow-providers-openlineage==1.6.0
apache-airflow-providers-postgres==5.10.2
apache-airflow-providers-redis==3.6.0
apache-airflow-providers-sendgrid==3.4.0
apache-airflow-providers-sftp==4.9.0
apache-airflow-providers-slack==8.6.1
apache-airflow-providers-smtp==1.6.1
apache-airflow-providers-snowflake==5.3.1
apache-airflow-providers-sqlite==3.7.1
apache-airflow-providers-ssh==3.10.1

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@dineshkumar20 dineshkumar20 added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels May 14, 2024
Copy link

boring-cyborg bot commented May 14, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@RNHTTR
Copy link
Collaborator

RNHTTR commented May 14, 2024

Given that this doesn't include the logic for bootstrap_pex_env , this isn't a reproducible example. Can you please share the entire traceback as well?

Instead of using pex, have you considered the KubernetesPodOperator or the PythonVirtualenvOperator?

@RNHTTR RNHTTR added Can't Reproduce The problem cannot be reproduced pending-response labels May 14, 2024
@dineshkumar20
Copy link
Author

dineshkumar20 commented May 14, 2024

@RNHTTR bootstrap_pex_env is a built-in method in pex package. I have attached the source code here.
https://github.com/pex-tool/pex/blob/36dd2374569b5f3fbef54918e6ba3bd1ff852b61/pex/pex_bootstrapper.py#L704

I have considered using KubernetesPodOperator for my usecase. I don't want to create new pods for each task. I want reuse the same pod for multiple tasks.

Copy link

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label May 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core Can't Reproduce The problem cannot be reproduced kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet pending-response stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

No branches or pull requests

2 participants