Skip to content

Conversion process

Kamil Breguła edited this page May 8, 2019 · 1 revision

The conversion of Oozie’s XML to Airflow’s DAG takes place in two phases:

  1. Parsing a workflow
  2. Writing a python code

Parsing a workflow

The result of the parsing is a Workflow type object. It’s contains a DAG name, input/output directory path, Airflow relations, parsed nodes with mappers, import statements. Parsing is divided into several steps:

  1. An XML file with Oozie workflow is loaded.
  2. XML elements are cleaned
  3. Nodes are processed
  4. Relations are created
  5. Additional mapper actions are performed after processing.
  6. Trigger rules are updated. As part of cleaning the XML code, XML namespaces are deleted, a names of actions are changed to conform to Python's syntax. For example: Python variables can not contain a dash character, so they are replaced to underscore character. Node processing using a Mapper and a ParsedNode. Each Mapper is responsible for only one type of nodes. Mapper can convert one action into one or many Airflow tasks. All mappers inherit from the BaseMapper class.

Mapper has the following method:

  • convert_to_text - Returns a python operators equivalent string with relations.
  • required_imports - Returns a list of the import statement that python
  • on_parse_node - Called when processing a node.
  • on_parse_finish - Called when processing of all nodes is finished.
  • copy_extra_assets - Copies extra assets required by the generated DAG - such as script files, jars etc.

And properties:

  • first_task_id
  • last_task_id
  • first_task_id and last_task_id properties are used to build relations. Mapper is stored in ParsedNode. It’s contains information about connections between other nodes and a trigger rule update logic.

The logic of processing each type of node is different. In most cases, it includes operations such as:

  • Creating a Mapper object suitable for the node type.
  • Creating a ParsedNode object.
  • Calling a on_parse_node method in mapper
  • Saving a downstream task names to ParsedNode object
  • Saving import statements in Workflow.
  • Saving ParsedNode in Workflow.

Relations are created based on information from ParsedNode objects from the previous step. Every mapper has a first_task_id and last_task_id properties. Relations are created for each node. There are two cases when relations are created:

  • Last task id is linked with first task id of downstream nodes.
  • Last task id is linked with first task id of error downstream node.

Each mapper has the ability to make changes in workflow. on_parse_finish method is performed for each mapper. The Workflow object is an argument to this method. For example: StartMapper and EndMapper use this mechanism to remove relation to itself. Last step is responsible for updating triggering rules. it is necessary to define the purpose of each node, because each node can be used to handle correct or error flow. Next, trigger rules are updated according to the table below.

Node used in correct flow Node used in error flow Trigger rule
X X . TriggerRule.DUMMY
X V TriggerRule.ONE_FAILED
V X TriggerRule.ONE_SUCCESS
V V TriggerRule.DUMMY (print a warning on console)

A node that is both correct and error flow does not have a valid representation in Airflow. (See: Known Limitations)

Writing a python code

The result of the writing is a python file. This phase uses the jinja2 template engine. This process is divided into several steps:

  1. Writing import statements.
  2. Writing parameters,
  3. Writing dag headers,
  4. Writing converted tasks.
  5. Writing relations First step, saves each import statement in a new line at start of file. Second step, save a parameters as a dict. It uses a json.dumps. Third step, save a dag header. It’s contains a base information about workflow - name of DAG, schedule interval, start date. Next step, saves results of calling convert_to_text method for each node. This method creates Airflow tasks and additional relations that correspond to single Ozzie action. Last step, save each relation in a new line at end of file.
Clone this wiki locally