Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle SIGTERM #40

Open
jankatins opened this issue May 21, 2020 · 6 comments
Open

Handle SIGTERM #40

jankatins opened this issue May 21, 2020 · 6 comments

Comments

@jankatins
Copy link
Member

jankatins commented May 21, 2020

K8s sends a SIGTERM followed by a waiting period and SIGKILL. The pipeline runner currently only handles SIGINT (ctrl-c, via the exception python raises for this key combo which triggers the generic except: block in the main loop).

Python per default seems to react to SIGTERM by terminating. This results in situations where the pipeline run is abruptly killed which leaves open runs around.

How to reproduce:

  • Run an pipeline
  • Kill all processes belonging to the run with SIGTERM (kill <list of pids> or close the docker container)

-> the runs (data_integration_run) and node_runs (data_integration_node_run) are left open (end_time is NULL).

We use the runs to not start a second run and the node_runs to display prometheus metrics per main pipeline (last run, currently running,...). Leaving them open means someone has to close them manually :-(

@jankatins
Copy link
Member Author

If the goal is to gracefully kill all tasks and show the output for all of these, then the interesting parts are:

  • The tasks are executed in another process
  • The signal handler does not give control back to the rest of the program (in the same process)

My current idea would be:

In the main process:

  • Add a handler which listens to SIGTERM and SIGINT (more?)
  • If triggered, signal the run() process to shutdown and then simply return to let the main loop handle the incoming messages

In run() process

  • Add a 'shutdown' (SIGTERM) signal handler which
  • Add a alarm signal handler which sends SIGKILL to all still running tasks and returns to let the process send the output events

(k8s waits 30 sec between SIGTERM and SIGKILL: https://cloud.google.com/blog/products/gcp/kubernetes-best-practices-terminating-with-grace

Maybe it would also be good to add a atexit function which closes the open run/node_run associated with the current run (only if they are still open) by refactoring some of the except: part into this function.

@jankatins
Copy link
Member Author

I spent some time on this (see #41 for some changes I needed in data-integration, the signal handler itself can live elsewhere as they get inherited on fork.)

  • making flask shutdown the running tasks in all cases is kind of madness: flask SIGKILLs itself. So anytime the ctrl-c is received in the main loop the forked processes are left running without a signal handler. Any such signal handler has to be set on import time to get in between (on runtime it's not in the main thread where no signal handler can be set). For the ctrl+c case this works (with the changes in the PR above), for any other signal I haven't found a way :-(. Forwarding a KeyboardInterrupt on SIGTERM might work, haven't tried that yet.
  • Just killing everything is okish (save the PID of the main process on import time, in the signal handler kill all children if these) but gracefully shutdown does not work at all in Flask (due to the SIGKILL).

I've a signal handler which I install in root_pipeline() (so works on commandline, but fails to install in flask -> basically equivalent to install in run_pipeline()) which

  • sets a guards against a second run
  • SIGTERM all children
  • if it's not in the main process, signals SIGTERM to the parent (to bubble up, e.g. when the systems statistic process or a task receives the signal)
  • if it's the main process: waits a second and SIGTERM again all children (to work around that we cannot prevent the scheduler to start new tasks before it received the first failure)
  • if it's the main process: schedule a alarm to SIGKILL all children in 10 seconds
  • return to let all python processes die naturally (tasks get there bash processes killed, the scheduler sends events and runs out of tasks, the run_pipeline handles events and then the scheduler finishes
  • If any of the children are left over, the alarm kills them with SIGKILL.

I will test this in our system for a bit and then see how it goes...

import signal
import os
import sys
import time
import atexit

_ALL_SIGNALS = [signal.SIGTERM, signal.SIGINT, signal.SIGQUIT]

def _signal_all_children(signum: int):
    """Signals all children, starting with the youngest ones"""
    import psutil
    parent = psutil.Process(os.getpid())
    processes = parent.children(recursive=True)
    if processes:
        _log(f"Found children, signaling {signum}...")
    for child in reversed(processes):
        try:
            os.kill(child.pid, signum)
        except:
            # e.g. when the process is already gone as we already killed the child of it
            pass

def _log(msg: str):
    print(msg, flush=True, file=sys.stderr)

def install_gracefully_shutdown_signal_handler():
    # gracefully shutting down:
    # by installing this signal handler, all children also get it. This goes down to the process which runs
    # run_pipeline, the executor started within that function and the TaskProcess started by the executor.
    # Installing a signal handler in the webserver turned out to be totally unreliable which is why we only install
    # this on a best effort base.

    main_process_pid = os.getpid()

    def _SIGKILL_all_children_handler(signum, frame):
        _log(f"Signaling SIGKILL to all children (PID: {os.getpid()})")
        _signal_all_children(signal.SIGKILL)
        return

    def _main_signal_handler(signum, frame):
        """kills children and signals the parent (up to the executor) to shutdown"""
        if hasattr(_main_signal_handler, 'shutdown_flag'):
            # already did the work
            return
        _main_signal_handler.shutdown_flag = True
        _log(f"Received shutdown signal, signaling all children to shutdown (PID: {os.getpid()})")
        _signal_all_children(signal.SIGTERM)
        if main_process_pid != os.getpid():
            # we are in a child -> bubble up the signal until we are in the executor itself
            os.kill(os.getppid(), signal.SIGTERM)
            # no other action to no die in the executor where events are generated and the TaskProcess
            # both will die naturally when all children have died
        else:
            # we are in the main process, usually whatever runs run_pipeline
            # send the signal again after 1 second to make sure that any newly started child processes also get killed
            # this happens because we cannot prevent the executor from starting new ones unless the executor already
            # has seen an error from the same pipeline. It would be nicer if we could do it directly in the executor...
            time.sleep(1)
            _signal_all_children(signal.SIGTERM)
            _log(f"Scheduling SIGKILL to all children in 10 sec (PID: {os.getpid()})")
            signal.signal(signal.SIGALRM, _SIGKILL_all_children_handler)
            signal.alarm(10)
        # return to let the process receive/send events -> if nothing is there anymore, the process dies in the end...
        return

    def cancel_alarm():
        # if we are about to shutdown because all processes already exited without a SIGKILL, we have to disable
        # the alarm which would sent this SIGKILL, otherwise we get a bad exit
        signal.alarm(0)

    # on fork, the signal handler stays on children unless overwritten
    try:
        for sig_no in _ALL_SIGNALS:
            signal.signal(sig_no, _main_signal_handler)
        atexit.register(cancel_alarm)
        _log("Installed signal handler to gracefully shutdown the ETL")
    except ValueError:
        # we are in a webserver, you are on your own...
        _log("Not in main thread, not adding signal handler")

martin-loetzsch pushed a commit that referenced this issue Jun 9, 2020
This makes some attempts to clean up during shutdown.

The main thing is the atexit function in the run_pipeline function which simply closes a run/node_run when shutting down.
The atexit in run() will shutdown any still running processes.
It also attempts to kill the run_process when the run_pipeline process is shut down via strg+c. This should trigger the atexit function in run() so at least the children are killed. This does not do any attempt to do the same for any other signal yet
Also included is a fix to actually check all ancestors of a task when checking if any of them is already failed -> the effect is that we do not schedule any tasks from an already queued sub pipelines (like a parallel task) in case the parent of that subpipeline is failed.

With this in place I could successfully add some signal handler (not included yet, needs some more testing) which kills the running processes and closes the runs. It also handles strg+c in flask better, at least I didn't see leftover processes anymore.

partly covers: #40
@martin-loetzsch
Copy link
Member

@jankatins is this still a problem that justifies the additional complexity?

@jankatins
Copy link
Member Author

jankatins commented Oct 28, 2020

We currently run with this (see below for the current version) in prod so no real clue what would happen if I remove this again. It's possible to add such a handler from "outside" of mara-pipeline but if this is done in mara-pipeline in run_pipeline would give this to everyone who runs mara pipelines via a k8s cron (which can kill and move pods under some circumstances, e.g. out of ram, or simple because the pod should be moved because the instance is down'ed for some reason).

https://github.com/mara/mara-pipelines/pull/43/files should IMO also still be merged to get rid of some log spam.

Current version of the signal handler installation when the root pipeline is called:

@patch(mara_pipelines.config.root_pipeline)
@functools.lru_cache(maxsize=None)
def root_pipeline():
    ...
    # install a signal handler to gracefully shut down. Would probably only run once, even without the guard,
    # thanks to the LRU cache
    # DOES NOT INSTALL when run via the flask web UI! 
    from app.signals import install_gracefully_shutdown_signal_handler
    if not hasattr(install_gracefully_shutdown_signal_handler, 'already_installed'):
        setattr(install_gracefully_shutdown_signal_handler, 'already_installed', True)
        install_gracefully_shutdown_signal_handler()

And this is app.signals:

import signal
import os
import sys
import time
import atexit
import psutil

_ALL_SIGNALS = [signal.SIGTERM, signal.SIGINT, signal.SIGQUIT]


def _signal_children(signum: int, parent_pid: int = None, only_tasks=False):
    """Signals all children, starting with the youngest ones"""
    if parent_pid is None:
        parent_pid = os.getpid()
    parent = psutil.Process(parent_pid)
    processes = parent.children(recursive=True)
    for child in reversed(processes):
        if only_tasks:
            if _name_process(child.pid) in ("Main", "Executor"):
                # protect these two so events are handled
                continue

        try:
            os.kill(child.pid, signum)
        except :
            # e.g. when the process is already gone as we already killed the child of it
            pass


def _log(msg: str):
    print(msg, flush=True, file=sys.stderr)


def _name_process(pid: int):
    """Names the process of the pid"""
    # The idea is that we start with a main process (run_pipeline), which starts the Executor (run())
    # which starts the system stats process and the individual tasks. All of these python processes share the same
    # "name" (the commandline). Within the tasks it's different and the process parent of the main process is different.
    try:
        p = psutil.Process(pid)
        p_name = p.name()
    except psutil.NoSuchProcess:
        # process died already...
        return
    python_name = psutil.Process(os.getpid()).name()

    name_for_level = {
        1: "Main",
        2: "Executor",
        3: "Task/SystemStats",
        4: "Task Child"
    }
    if p_name != python_name:
        # this assumes we only get children as pid arg!
        return name_for_level[4]

    for level in [1, 2, 3]:
        if p.parent().name() != p_name:
            return name_for_level[level]
        p = p.parent()
    # this can happen for python function tasks which open a new process
    return name_for_level[4]


def install_gracefully_shutdown_signal_handler():
    # gracefully shutting down:
    # by installing this signal handler, all children also get it. This goes down to the process which runs
    # run_pipeline, the executor started within that function and the TaskProcess started by the executor.
    # Installing a signal handler in the webserver turned out to be totally unreliable which is why we only install
    # this on a best effort base.
    # if the "highest parent" (which in a normal exec is the process which runs run_pipeline) get a SIGTERM/SIGINT:
    #  - set 'shutdown_flag' (and return if it is already set to not do the work twice)
    #  - signal SIGINT to all tasks
    #  - schedule a SIGINT in 2 sec for all children (so including the executor)
    #  - schedule a SIGKILL in 10 sec for all children and raise a KeyboardInterupt() to shutdown the run_pipeline()
    #  - return to let the run_pipeline receive the events for all the killed children
    # if any children TaskProcess or the stats process gets a SIGTERM/SIGINT:
    #  - set a "shutdown_flag" (and return if it is already set to not do the work twice)
    #  - signal SIGTERM to the main process (which should be the executor)
    #  - return to let the TaskProcess and the executor send the right events
    installing_process_pid = os.getpid()

    def _SIGKILL_all_children_handler(signum, frame):
        _log(f"Signaling SIGKILL to all children (PID: {os.getpid()})")
        _signal_children(signal.SIGKILL)
        # Go out via the SIGINT/KeyboardInterrupt as mara-pipeline handles that in several places
        raise KeyboardInterrupt()

    def _SIGINT_all_children_handler(signum, frame):
        _log(f"Signaling SIGINT to all children (PID: {os.getpid()})")
        _signal_children(signal.SIGINT)
        _log(f"Scheduling SIGKILL to all children in 10 sec (PID: {os.getpid()})")
        signal.signal(signal.SIGALRM, _SIGKILL_all_children_handler)
        signal.alarm(10)

    def _main_signal_handler(signum, frame):
        """kills children and signals the parent (up to the executor) to shutdown"""
        if hasattr(_main_signal_handler, 'shutdown_flag'):
            # already did the work
            return

        _main_signal_handler.shutdown_flag = True
        _sig_name = signal.Signals(signum).name
        _p_name = _name_process(os.getpid())
        _log(f"Received shutdown signal {_sig_name} in {_p_name} process (PID: {os.getpid()})")
        if installing_process_pid != os.getpid():
            # we are in a child -> bubble up the signal to the parent and the
            # main pid (which should be the executor itself)
            os.kill(installing_process_pid, signal.SIGINT)
            # no other action to not die in the executor where events are generated and the TaskProcess
            # both will die naturally when all children have died
        else:
            # we are in the main process, usually whatever runs run_pipeline
            _log(f"Shutdown all tasks (PID: {os.getpid()})")
            _signal_children(signal.SIGINT, installing_process_pid, only_tasks=True)
            # send another shutdown signal after 1 second to make sure that any newly started child processes
            # also get killed. this happens because we cannot prevent the executor from starting new ones unless
            # the executor already has seen an error from the same pipeline. It would be nicer if we could do it
            # directly in the executor...
            # While we sleep, no event handling happens!
            time.sleep(1)
            _signal_children(signal.SIGINT, installing_process_pid, only_tasks=True)
            _log(f"Scheduling SIGNINT to all children in 3 sec (PID: {os.getpid()})")
            signal.signal(signal.SIGALRM, _SIGINT_all_children_handler)
            signal.alarm(3)
        # return to let the process receive/send events -> if nothing is there anymore, the process dies in the end...
        return

    def cancel_alarm():
        # if we are about to shutdown because all processes already exited without a SIGKILL, we have to disable
        # the alarm which would sent this SIGKILL, otherwise we get a bad exit
        signal.alarm(0)

    # on fork, the signal handler stays on children unless overwritten
    try:
        for sig_no in _ALL_SIGNALS:
            signal.signal(sig_no, _main_signal_handler)
        atexit.register(cancel_alarm)
        _log("Installed signal handler to gracefully shutdown the ETL")
    except ValueError:
        # we are in a webserver, you are on your own...
        _log("Not in main thread, not adding signal handler")

@leo-schick
Copy link
Member

I see this as an important topic that this should be part of the mara-pipelines package. I started now to use docker projects and see that this uses the same approach sending SIGTERM messages. In addition, on my current debian mara server I have often running mara tasks which run infinitely because they got stuck somehow... which I kill then from time to time. These jobs are executed via flask mara_pipelines.ui.run --path ....

I think we should write unit tests for these problems and then build a solution from that (test-driven development). When new changes are done to the pipeline execution (see #74 #69 ) they can easily mess up something which was working before.

@leo-schick
Copy link
Member

leo-schick commented Jun 5, 2022

I investigated this now a bit and I personally think we need here another solution since you can define a signal handler only once in your app code. What we need is an option to communicate with the running pipeline e.g. via a multiprocessing Queue or Pipe class instance: We add a global list somewhere holding all currently running pipeline processes (e.g. including the statistic process).
For the pipeline process it will then be possible to receive the following signals via a Queue/Pipe:

  • STOP - finish all running tasks but not running any additional nodes by clearing node_queue
  • TERMINATE - same as STOP but sends in addition SIGTERM to all running sub processes
  • KILL - same as STOP but sends in addition SIGKILL to all running sub processes

A public method should be available to send these signals to all/a specific running pipeline. Maybe this could be even implemented in a REST API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants