Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple root pipelines or pipelines collections #57

Open
ghost opened this issue Nov 4, 2020 · 2 comments
Open

Multiple root pipelines or pipelines collections #57

ghost opened this issue Nov 4, 2020 · 2 comments

Comments

@ghost
Copy link

ghost commented Nov 4, 2020

By design, there is one root pipeline where all other pipelines are added to. This might make sence when you just have one pipeline which just does one task, but I have several pipelines which I don't want to execute together.
E.g. a pipeline for a daily refresh with several incremental refreshes, a pipeline to execute a complete full load, a pipeline running at a specific time to refresh on demand specific data areas etc.

I came up with the following ideas how this could be solved:

  • Multiple root pipelines --> when more than one is added, you have to specify the root pipeline name as additional parameter in flask mara_pipeline.ui.run --pipline ...
  • add a PipelineCollection class. This pipeline class would have a collection of pipelines. You can't run a PipelineCollection, but can run its sub-pipelines. This class could then be set as the root pipeline. A pipeline would then be called via flask mara_pipeline.ui.run --path <pipeline_name>.<path within the pipeline>

Has someone other ideas? Is there maybe a common way how to solve this I am not aware of?

@ghost ghost changed the title Multiple root or pipelines collections Multiple root pipelines or pipelines collections Nov 4, 2020
@jankatins
Copy link
Member

jankatins commented Jan 22, 2021

I've added an extra runner which looks to certain lables in the pipeline when executing:

        # It's after 04:00 to give the load jobs in DV a bit of time to download everything...
        labels={'run_mode': 'once_a_day', # other values: always, manual
                'run_after': '04:00:01',
                'run-nightly-stg': True}
    )

This is the runner:

import sys
import os

import click
import datetime

# data_integration_run -> the "head" pipeline which is started, e.g. this runner always starts the root pipeline
# data_integration_node_run -> individual pipelines/nodes which are run
# node_path -> array with path elements from the root pipeline. E.g. "[]" is the root pipeline,
#              "['operational_system']" for OS

__last_runtime_query = f'''
SELECT
  max(end_time)
FROM data_integration_node_run
WHERE succeeded
and node_path =  ARRAY [{"%s"}];
'''

__already_running_processes = '''
SELECT run_id, start_time, node_path
FROM data_integration_run
WHERE end_time IS NULL
  -- "self healing" in case a job is not closed. But long enough to not start
  -- if a legitimate run is still running
  AND start_time > now() - INTERVAL '12 hours'
ORDER BY start_time DESC
LIMIT 1
'''


def __debug(msg: str):
    if False or os.environ.get('DEBUG'):
        print(msg)


def _should_run_once_a_day(pipeline_id: str, run_after: str, start_ts: datetime.datetime):
    from mara_db.postgresql import postgres_cursor_context
    # We only look at today, if a pipeline was not successfully running yesterday,
    # this is ignored after midnight UTC
    run_after_time = datetime.time.fromisoformat(run_after).replace(tzinfo=datetime.timezone.utc)
    run_after_ts = datetime.datetime.combine(start_ts.date(), run_after_time)
    # No need to check anything if are not after the run_after time
    __debug(f'  start_ts:     {start_ts}')
    __debug(f'  run_after_ts: {run_after_ts}')
    if start_ts < run_after_ts:
        __debug(f'  not yet time -> should NOT run')
        return False

    # We might run, but should check if we already ran
    with postgres_cursor_context('mara') as cursor:  # type: psycopg2.extensions.cursor
        cursor.execute(__last_runtime_query, (pipeline_id,))
        last_run_ts = cursor.fetchone()[0]
        __debug(f'  last_run_ts:  {last_run_ts}')
        if last_run_ts is None:
            # we've never run the pipeline
            __debug(f'  never run -> should run')
            return True
        if last_run_ts > run_after_ts:
            # we already did a successful run
            __debug(f'  already ran today -> should NOT run')
            return False
    # no sucsessful run after the run_after time -> do a run
    __debug(f'  did not run yet -> should run')
    return True


def _ensure_no_other_running_etl():
    from mara_db.postgresql import postgres_cursor_context

    # We might run, but should check if we already ran
    with postgres_cursor_context('mara') as cursor:  # type: psycopg2.extensions.cursor
        cursor.execute(__already_running_processes)
        row = cursor.fetchone()
        if row is None:
            # no other ETL running
            return True
        else:
            import app.slack
            run_id, start_ts, node_path = row
            info = f'run_id: {run_id}, start_ts: {start_ts}, node_path: {node_path}'
            msg = f"Found already running ETL, aborting: ({info})"
            print(msg)
            app.slack.notify_slack_info(msg)
            # exit with a success number, one slack message is enough
            sys.exit(0)


@click.command()
@click.option('--overwrite-once-a-day', default=False, is_flag=True,
              help='Include all "once_a_day" pipelines in the run')
@click.option('--overwrite-manual', default=False, is_flag=True,
              help='Include all "manual" pipelines in the run')
@click.option('--overwrite-already-running-etl', default=False, is_flag=True,
              help="Don't check and abort, if an ETL is already running")
def run_root_pipeline(overwrite_once_a_day: bool, overwrite_manual: bool, overwrite_already_running_etl: bool):
    """Runs configured nodes in the root pipeline"""
    from mara_pipelines.ui.cli import run_pipeline
    from mara_pipelines import config, pipelines

    __debug(f'overwrite_once_a_day:          {overwrite_once_a_day}')
    __debug(f'overwrite_manual:              {overwrite_manual}')
    __debug(f'overwrite_already_running_etl: {overwrite_already_running_etl}')

    # we always take the root pipeline here but remove nodes which only need to run once a day after
    # a configured timestamp in UTCs

    # A pipline can have certain labels:
    # 'run_mode':'once_a_day', or 'manual', or 'always' (= default)
    # 'run_after':'01:00:00' -> if 'run_mode == 'once_a_day', then run it once after this time
    #                           time is in UTC

    # We use the start of the overall run, not the start of the individual pipeline
    start_ts = datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0)
    os.environ['PGAPPNAME'] = f'mara_etl_framework__{start_ts.isoformat()}'

    if not overwrite_already_running_etl:
        _ensure_no_other_running_etl()

    root = config.root_pipeline()

    # a list of nodes (= pipelines) to run selectively in the pipeline
    _nodes = set()

    # Check which nodes should be run
    for node in root.nodes.values():
        __debug(f'Node: {node.id}')
        if not isinstance(node, pipelines.Pipeline):
            # for now we would just run it...
            __debug(f'  Not pipeline -> running')
            _nodes.add(node)
            continue
        run_mode = node.labels.get('run_mode', 'always')
        if run_mode == 'always':
            __debug(f'  mode: always -> running')
            _nodes.add(node)
            continue
        elif run_mode == 'once_a_day':
            if overwrite_once_a_day:
                __debug(f'  mode: once_a_day + overwrite -> running')
                _nodes.add(node)
                continue
            run_after = node.labels.get('run_after', '00:00:00')
            if _should_run_once_a_day(str(node.id), run_after, start_ts):
                __debug(f'  mode: once_a_day + time -> running')
                _nodes.add(node)
            else:
                __debug(f'  mode: once_a_day + already_run_today -> NOT running')
                print(f"NOT running pipeline {node.id} (already ran today).")
            continue
        else:
            # this assumes that anything else is manual mode
            if overwrite_manual:
                __debug(f'  mode: manual + overwrite -> running')
                _nodes.add(node)
            else:
                __debug(f'  mode: manual -> NOT running')
                print(f"NOT running pipeline {node.id} (set to manual).")
            continue
    __debug(str([n.id for n in _nodes]))
    if not run_pipeline(root, _nodes, False):
        sys.exit(-1)


@click.command()
@click.option('--path', default='',
              help='The parent ids of of the pipeline to run, separated by comma. Example: "pipeline-id,sub-pipeline-id".')
@click.option('--nodes',
              help='IDs of sub-nodes of the pipeline to run, separated by comma. When provided, then only these nodes are run. Example: "do-this, do-that".')
@click.option('--with-upstreams', default=False, is_flag=True,
              help='Also run all upstreams of --nodes within the pipeline.')
@click.option('--only-with-label', default='',
              help='Only execute if a label of that name is present in the pipeline labels and the value truish.')
@click.option('--overwrite-already-running-etl', default=False, is_flag=True,
              help="Don't check and abort, if an ETL is already running")
def run(path, nodes, with_upstreams: bool, only_with_label: bool, overwrite_already_running_etl: bool):
    """Runs a pipeline or a sub-set of its nodes"""
    # copied from mara_pipelines/ui/cli.py with the addition of checking for already running ETLs and the
    # possibility to overwrite
    from mara_pipelines.ui.cli import run_pipeline
    from mara_pipelines import pipelines

    # --- ADDED: START ---- # compared to the upstream run command
    # Make sure we can identify this
    start_ts = datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0)
    os.environ['PGAPPNAME'] = f'mara_etl_framework__{start_ts.isoformat()}'

    if not overwrite_already_running_etl:
        _ensure_no_other_running_etl()
    # --- ADDED: END ----

    # the pipeline to run
    path = path.split(',')
    pipeline, found = pipelines.find_node(path)
    if not found:
        print(f'Pipeline {path} not found', file=sys.stderr)
        sys.exit(-1)
    if not isinstance(pipeline, pipelines.Pipeline):
        print(f'Node {path} is not a pipeline, but a {pipeline.__class__.__name__}', file=sys.stderr)
        sys.exit(-1)

    # a list of nodes to run selectively in the pipeline
    _nodes = set()
    for id in (nodes.split(',') if nodes else []):
        node = pipeline.nodes.get(id)
        if not node:
            print(f'Node "{id}" not found in pipeline {path}', file=sys.stderr)
            sys.exit(-1)
        else:
            _nodes.add(node)

    # --- ADDED: START ----
    if only_with_label:
        if with_upstreams:
            print(f'Cannot handle --only-with-label together with --with-upstreams.', file=sys.stderr)
            sys.exit(-1)

        potential_nodes = _nodes if _nodes else pipeline.nodes.values()
        _nodes = set()
        for n in potential_nodes:
            if isinstance(n, pipelines.Pipeline):
                if n.labels.get(only_with_label, False):
                    _nodes.add(n)
            else:
                _nodes.add(n)
        if not _nodes:
            print(f'No nodes found with label "{only_with_label}".')
            sys.exit(-1)
    __debug(f'pipeline: {pipeline.id}')
    __debug(f'nodes: {str([n.id for n in _nodes])}')
    # --- ADDED: END ----

    if not run_pipeline(pipeline, _nodes, with_upstreams, interactively_started=False):
        sys.exit(-1)

# [Some cli commands omitted, like a command to ensure that the ELT db exist or something to print an env file from the current local config so one can test docker stuff]

One can probably merge run and run run_root_pipeline to get all the commandline flags... If there is interest to get this functionality merged here, I can summit a PR.

@ghost
Copy link
Author

ghost commented Jan 27, 2021

I think we should work together to make a PR for this. I build already something similar:
master...hz-lschick:add-run-label-filter

Instead of starting the main run_pipeline method with a custom _nodes list, I did adjustments directly in the mara_pipeline module. The core differences between our code is:

  • in your use case you (probably) focused only on selecting specific pipelines. In my code I focused on selecting specific nodes. I use it for example "deploy" a database (CREATE/DROP/...) without any data-loading interactions (SELECT/INSERT/UPDATE/DELETE/...). Use case was that I needed faster feedback about my dev. errors e.g. when you rename a column in a table.
  • Your code can filter on specific labels. I added a label-filter option which lets you filter the nodes based on a python-like filter string e.g. run-nightly-stg and run_mode==once_a_day (see def label_filter_applies_to_node(..))
  • since you focused on pipeline executioning, you just take the general node cost. I made an adjustment to use different node costs when you use a different label-filter on the execution.

I certainly agree that my solution - adjusting the mara_pipelines module - has its downsides and I would love to work together to bring a label-filter execution option into the master branch. I would like to get feedback here from @martin-loetzsch what he thinks about this. Maybe we should create a separate issue for that.

But back to the topic: My main reason for this issue was that I would like to have a safer option in mara to manage multiple pipelines without e.g the option that someone can accidental execute a full-load and incremental load by running the root pipeline.

image

I miss here an alternate way instead of using one big root pipeline.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant