Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AwsBatchExecutor is unable to adopt orphaned task instances #14

Open
mattellis opened this issue May 12, 2021 · 0 comments
Open

AwsBatchExecutor is unable to adopt orphaned task instances #14

mattellis opened this issue May 12, 2021 · 0 comments

Comments

@mattellis
Copy link

mattellis commented May 12, 2021

We've been successfully using the Airflow Batch Executor in a production AWS environment for a number of weeks now, where previously we ran a Celery Executor on an EC2 ASG. Overall it's working very well, but we've had to patch the executor to solve a couple of minor issues for our airflow implementation.

This may be intentional behaviour and implementation, but we recently discovered that the Batch Executor (and also the Fargate executor) does not have the ability to adopt orphaned tasks, and that it terminates all running Batch jobs when it receives the terminate() callback. We have a regular release cycle (every day or two), and the scheduler instances get replaced. As per normal airflow behaviour, the newly booted scheduler/executor will try and "adopt orphaned task instances" that were started by the terminated SchedulerJob/s.

However, with the AwsBatchExecutor we are now getting task failures every time the scheduler is deployed, which show up as a SIGTERM sent to each running LocalTaskJob on the Batch job containers. An example of a Task log with this issue:

[2021-04-29 10:02:05,102] {{logging_mixin.py:104}} INFO - Running <TaskInstance: ******** 2021-04-28T00:00:00+00:00 [running]> on host ********
[2021-04-29 10:02:05,606] {{taskinstance.py:1255}} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=********
AIRFLOW_CTX_DAG_ID=********
AIRFLOW_CTX_TASK_ID=********
AIRFLOW_CTX_EXECUTION_DATE=2021-04-28T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-04-28T00:00:00+00:00
...
[2021-04-29 10:04:28,398] {{local_task_job.py:187}} WARNING - State of this instance has been externally set to None. Terminating instance.
[2021-04-29 10:04:28,399] {{process_utils.py:100}} INFO - Sending Signals.SIGTERM to GPID 117
[2021-04-29 10:04:28,400] {{taskinstance.py:1239}} ERROR - Received SIGTERM. Terminating subprocesses.
[2021-04-29 10:04:28,580] {{taskinstance.py:1455}} ERROR - Task received SIGTERM signal
...
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.8/site-packages/airflow/providers/amazon/aws/operators/batch.py", line 148, in execute
    self.monitor_job(context)
  File "/usr/local/lib/python3.8/site-packages/airflow/providers/amazon/aws/operators/batch.py", line 205, in monitor_job
    raise AirflowException(e)
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2021-04-29 10:04:28,582] {{taskinstance.py:1496}} INFO - Marking task as FAILED.
[2021-04-29 10:04:28,652] {{process_utils.py:66}} INFO - Process psutil.Process(pid=117, status='terminated', exitcode=1, started='10:02:04') (117) terminated with exit code 1

Presumably the terminate method does this to clean up running batch jobs in case the scheduler / executor is shutting down and not resuming. However, for users with rolling scheduler deployments, this is causing unnecessary failures. In order to support adoption of orphaned tasks, the BatchExecutor just needs to store the AWS Batch job_id in the TaskInstance.external_executor_id field when it submits a job, and then implement the BaseExecutor.try_adopt_task_instances method. This method simply needs to put the orphaned task instance key and external_executor_id attributes in the active_workers.add_job method of the newly booted executor.

A test implementation yields the desired result, from the scheduler instance / batch executor logs:
Screen Shot 2021-05-06 at 1 26 48 pm

Given that some users may have a different deployment architecture, I propose that this implementation could be toggled by a configuration option such as [batch].adopt_orphaned_task_instances (bool). I'd be happy with defaulting this to True or False, depending on the main intention of this Executor implementation.

PS. We were actually considering writing a Batch Executor when we came across this implementation with perfect timing, so thanks for open sourcing this great Executor solution. Hopefully it can be integrated with Airflow mainline at some point, as I believe it has real legs for anyone running a custom Airflow stack on AWS.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant