Skip to content

Commit

Permalink
Merge pull request #10 from aelzeiny/submit-job-kwargs
Browse files Browse the repository at this point in the history
Add more extensibility to AWS Batch
  • Loading branch information
aelzeiny committed Apr 26, 2021
2 parents 5cfd688 + 320fe08 commit 6ce664e
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 17 deletions.
7 changes: 7 additions & 0 deletions .travis.yml
@@ -1,8 +1,15 @@
dist: xenial
language: python
python:
- "3.6" # current default Python on Travis CI
- "3.7"
- "3.8"
before_install:
- sudo apt-get autoremove sqlite3
- sudo apt-get install python-software-properties
- sudo apt-add-repository -y ppa:linuxgndu/sqlite-nightly
- sudo apt-get -y update
- sudo apt-cache show sqlite3
install:
- pip install apache-airflow boto3 pylint isort marshmallow
env:
Expand Down
34 changes: 28 additions & 6 deletions airflow_aws_executors/batch_executor.py
Expand Up @@ -125,17 +125,20 @@ def execute_async(self, key: TaskInstanceKeyType, command: CommandType, queue=No
"""
if executor_config and 'command' in executor_config:
raise ValueError('Executor Config should never override "command"')
job_id = self._submit_job(command, executor_config or {})
job_id = self._submit_job(key, command, queue, executor_config or {})
self.active_workers.add_job(job_id, key)

def _submit_job(self, cmd: CommandType, exec_config: ExecutorConfigType) -> str:
def _submit_job(
self,
key: TaskInstanceKeyType,
cmd: CommandType, queue: str,
exec_config: ExecutorConfigType
) -> str:
"""
The command and executor config will be placed in the container-override section of the JSON request, before
calling Boto3's "run_task" function.
calling Boto3's "submit_job" function.
"""
submit_job_api = deepcopy(self.submit_job_kwargs)
submit_job_api['containerOverrides'].update(exec_config)
submit_job_api['containerOverrides']['command'] = cmd
submit_job_api = self._submit_job_kwargs(key, cmd, queue, exec_config)
boto_run_task = self.batch.submit_job(**submit_job_api)
try:
submit_job_response = BatchSubmitJobResponseSchema().load(boto_run_task)
Expand All @@ -149,6 +152,25 @@ def _submit_job(self, cmd: CommandType, exec_config: ExecutorConfigType) -> str:
)
return submit_job_response['job_id']

def _submit_job_kwargs(
self,
key: TaskInstanceKeyType,
cmd: CommandType,
queue: str, exec_config: ExecutorConfigType
) -> dict:
"""
This modifies the standard kwargs to be specific to this task by overriding the airflow command and updating
the container overrides.
One last chance to modify Boto3's "submit_job" kwarg params before it gets passed into the Boto3 client.
For the latest kwarg parameters:
.. seealso:: https://docs.aws.amazon.com/batch/latest/APIReference/API_SubmitJob.html
"""
submit_job_api = deepcopy(self.submit_job_kwargs)
submit_job_api['containerOverrides'].update(exec_config)
submit_job_api['containerOverrides']['command'] = cmd
return submit_job_api

def end(self, heartbeat_interval=10):
"""
Waits for all currently running tasks to end, and doesn't launch any tasks
Expand Down
43 changes: 32 additions & 11 deletions readme.md
Expand Up @@ -14,7 +14,7 @@ For `AWS ECS/Fargate`: [Getting Started with AWS ECS/Fargate ReadMe](getting_sta


## How Airflow Executors Work
Every time Apache Airflow wants to run a task, the Scheduler generates a CLI command that needs to be executed **somewhere**.
Every time Apache Airflow wants to run a task, the Scheduler generates a shell command that needs to be executed **somewhere**.
Under the hood this command will run Python code, and it looks something like this:
```bash
airflow run <DAG_ID> <TASK_ID> <EXECUTION_DATE>
Expand Down Expand Up @@ -61,28 +61,29 @@ The Celery Backend and worker queue also need attention and maintenance. I've tr
triggering CloudWatch Events, triggering capacity providers, triggering Application Autoscaling groups,
and it was a mess that I never got to work properly.

#### The Case for AWS Fargate
#### The Case for AWS Batch on AWS Fargate, and AWS Fargate
If you're on the Fargate executor it may take ~2.5 minutes for a task to pop up, but at least it's a constant O(1) time.
This way, the concept of tracking DAG Landing Times becomes unnecessary.
If you have more than 2000 concurrent tasks (which is a lot) then you can always contact AWS to provide an increase in this soft-limit.


## AWS Batch v AWS ECS v AWS Fargate?
**I almost always recommend that you go the AWS Batch route**. Especially since, as of Dec 2020, AWS Batch supports Fargate deployments. So unless you need some very custom flexibility provided by ECS, or have a particular reason to use AWS Fargate directly, then go with AWS Batch.

`AWS Batch` - Is built on top of ECS, but has additional features for Batch-Job management. Including auto-scaling up and down servers on an ECS cluster based on jobs submitted to a queue. Generally easier to configure and setup than either option.

`AWS Fargate` - Is a serverless container orchestration service; comparable to a proprietary AWS version of Kubernetes. Launching a Fargate Task is like saying "I want these containers to be launched somewhere in the cloud with X CPU and Y memory, and I don't care about the server". AWS Fargate is built on top of AWS ECS, and is easier to manage and maintain. However, it provides less flexibility.

`AWS ECS` - Is known as "Elastic Container Service", which is a container orchestration service that uses a designated cluster of EC2 instances that you operate, own, and maintain.

I almost always recommend that you go the AWS Batch or AWS Fargate route unless you need some very custom flexibility provided by ECS.

| | Batch | Fargate | ECS |
|-------------------|-------------------------------------------------------------------------------------|---------------------------------------------|---------------------------------------------------|
| Start-up per task | Instantaneous 3s, if capacity available; otherwise 2-3 minutes to launch new server | 2-3 minutes per task; O(1) constant time | Instant 3s, or until capacity is available. |
| Maintenance | You patch the own, operate, and patch the servers | Serverless | You patch the own, operate, and patch the servers |
| Start-up per task | Combines both, depending on if the job queue is Fargate serverless | 2-3 minutes per task; O(1) constant time | Instant 3s, or until capacity is available. |
| Maintenance | You patch the own, operate, and patch the servers OR Serverless (as of Dec 2020) | Serverless | You patch the own, operate, and patch the servers |
| Capacity | Autoscales to configurable Max vCPUs in compute environment | ~2000 containers. See AWS Limits | Fixed. Not auto-scaling. |
| Flexibility | High. Almost anything that you can do on an EC2 | Low. Can only do what AWS allows in Fargate | High. Almost anything that you can do on an EC2 |
| Fractional CPUs? | No. Each task has 1 vCPU. | Yes. A task can have 0.25 vCPUs. | Yes. A task can have 0.25 vCPUs. |
| Flexibility | Combines both, depending on if the job queue is Fargate serverless | Low. Can only do what AWS allows in Fargate | High. Almost anything that you can do on an EC2 |
| Fractional CPUs? | Yes, as of Dec 2020 a task can have 0.25 vCPUs. | Yes. A task can have 0.25 vCPUs. | Yes. A task can have 0.25 vCPUs. |


## Optional Container Requirements
Expand All @@ -98,10 +99,8 @@ task = PythonOperator(
python_callable=lambda *args, **kwargs: print('hello world'),
task_id='say_hello',
executor_config=dict(
containerOverrides=dict(
vcpus=1, # no fractional CPUs
memory=512
)
vcpus=1,
memory=512
),
dag=dag
)
Expand Down Expand Up @@ -220,6 +219,17 @@ CUSTOM_SUBMIT_JOB_KWARGS['retryStrategy'] = {'attempts': 3}
CUSTOM_SUBMIT_JOB_KWARGS['timeout'] = {'attemptDurationSeconds': 24 * 60 * 60 * 60}
```

"I need more levers!!! I should be able to make changes to how the API gets called at runtime!"

```python
class CustomBatchExecutor(AwsBatchExecutor):
def _submit_job_kwargs(self, task_id, cmd, queue, exec_config) -> dict:
submit_job_api = super()._submit_job_kwargs(task_id, cmd, queue, exec_config)
if queue == 'long_tasks_queue':
submit_job_api['retryStrategy'] = {'attempts': 3}
submit_job_api['timeout'] = {'attemptDurationSeconds': 24 * 60 * 60 * 60}
return submit_job_api
```

#### AWS ECS/Fargate
In this example we will modify the default `submit_job_kwargs`. Note, however, there is nothing that's stopping us
Expand All @@ -244,6 +254,17 @@ CUSTOM_RUN_TASK_KWARGS['overrides']['containerOverrides'][0]['environment'] = [
]
```

"I need more levers!!! I should be able to make changes to how the API gets called at runtime!"

```python
class CustomFargateExecutor(AwsFargateExecutor):
def _run_task_kwargs(self, task_id, cmd, queue, exec_config) -> dict:
run_task_api = super()._run_task_kwargs(task_id, cmd, queue, exec_config)
if queue == 'long_tasks_queue':
run_task_api['retryStrategy'] = {'attempts': 3}
run_task_api['timeout'] = {'attemptDurationSeconds': 24 * 60 * 60 * 60}
return run_task_api
```

## Issues & Bugs
Please file a ticket in GitHub for issues. Be persistent and be polite.
Expand Down

0 comments on commit 6ce664e

Please sign in to comment.