Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@hookimpl on_dag_run_running, on_dag_run_success, on_dag_run_failed do not find Connections and Variables #39646

Open
1 of 2 tasks
ChrnvaN opened this issue May 15, 2024 · 1 comment
Labels
affected_version:2.8 Issues Reported for 2.8 area:Listeners kind:bug This is a clearly a bug

Comments

@ChrnvaN
Copy link

ChrnvaN commented May 15, 2024

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.1

What happened?

I wrote a listener plugin and added it to the plugins directory. The listener includes the methods:

  • on_task_instance_running
  • on_task_instance_success
  • on_task_instance_failed
  • on_dag_run_running
  • on_dag_run_success
  • on_dag_run_failed.
    When trying to extract values from Variables and Connections in the on_dag_run_running, on_dag_run_success, on_dag_run_failed methods, an error occurs already when running dag run:
Traceback (most recent call last):

  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 52, in _run_scheduler_job

    run_job(job=job_runner.job, execute_callable=job_runner._execute)

  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 79, in wrapper

    return func(*args, session=session, **kwargs)

  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/job.py", line 393, in run_job

    return execute_job(job, execute_callable=execute_callable)

  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/job.py", line 422, in execute_job

    ret = execute_callable()

  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py", line 855, in _execute

    self._run_scheduler_loop()

  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py", line 987, in _run_scheduler_loop

    num_queued_tis = self._do_scheduling(session)

  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py", line 1063, in _do_scheduling

    self._start_queued_dagruns(session)

  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py", line 1404, in _start_queued_dagruns

    dag_run.notify_dagrun_state_changed()

  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagrun.py", line 862, in notify_dagrun_state_changed

    get_listener_manager().hook.on_dag_run_running(dag_run=self, msg=msg)

  File "/home/airflow/.local/lib/python3.9/site-packages/pluggy/_hooks.py", line 493, in __call__

    return self._hookexec(self.name, self._hookimpls, kwargs, firstresult)

  File "/home/airflow/.local/lib/python3.9/site-packages/pluggy/_manager.py", line 115, in _hookexec

    return self._inner_hookexec(hook_name, methods, kwargs, firstresult)

  File "/home/airflow/.local/lib/python3.9/site-packages/pluggy/_callers.py", line 113, in _multicall

    raise exception.with_traceback(exception.__traceback__)

  File "/home/airflow/.local/lib/python3.9/site-packages/pluggy/_callers.py", line 77, in _multicall

    res = hook_impl.function(*args)

  File "/opt/airflow/plugins/metadata/airflow_metadata.py", line 206, in on_dag_run_running

    my_connection = BaseHook.get_connection("my_connection")

  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/hooks/base.py", line 82, in get_connection

    conn = Connection.get_connection_from_secrets(conn_id)

  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/connection.py", line 479, in get_connection_from_secrets

    raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")

airflow.exceptions.AirflowNotFoundException: The conn_id `my_connection` isn't defined

Although the connection is explicitly defined in Connections and everything works in the case of listening to tasks.

How can I fix it so that if I listen to Dog Run, everything works too?

What you think should happen instead?

I expect the values to be pulled up from Connection and Variable when listening to DagRun.

How to reproduce

Add to Connection "my_connection" and add to Variable "environment"

from airflow.listeners import hookimpl
from airflow.models.taskinstance import TaskInstance
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.models.dagrun import DagRun
from airflow.plugins_manager import AirflowPlugin


class AirflowListener:
    @hookimpl
    def on_task_instance_running(self, task_instance: TaskInstance) -> None:
        my_connection = BaseHook.get_connection("my_connection")
        env = Variable.get("environment")

    @hookimpl
    def on_task_instance_success(self, task_instance: TaskInstance) -> None:
        my_connection = BaseHook.get_connection("my_connection")
        env = Variable.get("environment")

    @hookimpl
    def on_task_instance_failed(self, task_instance: TaskInstance) -> None:
        my_connection = BaseHook.get_connection("my_connection")
        env = Variable.get("environment")

    @hookimpl
    def on_dag_run_running(self, dag_run: DagRun):
        my_connection = BaseHook.get_connection("my_connection")
        env = Variable.get("environment")

    @hookimpl
    def on_dag_run_success(self, dag_run: DagRun):
        my_connection = BaseHook.get_connection("my_connection")
        env = Variable.get("environment")

    @hookimpl
    def on_dag_run_failed(self, dag_run: DagRun):
        my_connection = BaseHook.get_connection("my_connection")
        env = Variable.get("environment")
        
        
class AirflowListenerPlugin(AirflowPlugin):
    name = "AirflowListener"
    listeners = [AirflowListener()]

Operating System

macOS Sonoma 14.1.2

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

apache/airflow:2.8.1-python3.9
executor: CeleryExecutor

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@ChrnvaN ChrnvaN added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels May 15, 2024
Copy link

boring-cyborg bot commented May 15, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@RNHTTR RNHTTR added area:Listeners and removed area:core needs-triage label for new issues that we didn't triage yet labels May 15, 2024
@eladkal eladkal added the affected_version:2.8 Issues Reported for 2.8 label May 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.8 Issues Reported for 2.8 area:Listeners kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

3 participants