Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't print full KeyboardInterrupt Exceptions #43

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

jankatins
Copy link
Member

@jankatins jankatins commented Jun 10, 2020

I found two places where such things showed up:

  • when the ctrl+c happened in system stats process
  • when an error happened in the event handler (run_pipeline) and the final print would print it.

This should cover the most likely cases where this can occur.

Closes: #42

@martin-loetzsch
Copy link
Member

There's a unfortunately a third place:

  File "/Users/mloetzsch/Projects/project-a/mara-example-project-2/packages/data-integration/data_integration/execution.py", line 348, in run_pipeline
    _notify_all(event)
  File "/Users/mloetzsch/Projects/project-a/mara-example-project-2/packages/data-integration/data_integration/execution.py", line 340, in _notify_all
    raise e
  File "/Users/mloetzsch/Projects/project-a/mara-example-project-2/packages/data-integration/data_integration/execution.py", line 333, in _notify_all
    runlogger.handle_event(event)
  File "/Users/mloetzsch/Projects/project-a/mara-example-project-2/packages/data-integration/data_integration/logging/run_log.py", line 127, in handle_event
    with mara_db.postgresql.postgres_cursor_context('mara') as cursor:  # type: psycopg2.extensions.cursor
  File "/usr/local/Cellar/python@3.8/3.8.1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)
  File "/Users/mloetzsch/Projects/project-a/mara-example-project-2/packages/mara-db/mara_db/postgresql.py", line 18, in postgres_cursor_context
    connection = psycopg2.connect(dbname=db.database, user=db.user, password=db.password,
  File "/Users/mloetzsch/Projects/project-a/mara-example-project-2/.venv/lib/python3.8/site-packages/psycopg2/__init__.py", line 127, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
KeyboardInterrupt

I'm wondering if we can handle CTRL+C in a similar way as other interrupts.

The doc at https://docs.python.org/3.8/library/signal.html says "SIGINT is translated into a KeyboardInterrupt exception if the parent process has not changed it."

So I'd say we try to "change it".

@jankatins
Copy link
Member Author

jankatins commented Jun 11, 2020

the problem is that other signals are even worse to handle directly in the code. In the signal handler we are using now I basically translate any received signal to SIGINT and then handle that as the rest of the mara code has already some places where KeyboardInterrupt is handled.

Some context:

Adding this PR (should) make all stacktraces for ctrl+c go away. Adding a signal handler in flask is not possible, at least not in all processes as a) we run in a thread at the point we can install one in run_pipeline and you cannot install a signal handler in a non-main thread and b) if you try to do it on import time, this interferes with flasks own shutdown sequence which expects a KeyboardInterrupt.

For the usecase of the commandline pipeline runner, we can install it in run_pipeline (and we do now install the below in root_pipeline() which is run in that main process). If you want to shutdown "cleanly" (= kill all tasks, register the bad output in the inner run() process, send it as events to the event handler in run_pipeline() and persist/forward to frontend), the the below does what's needed for that. It basically first kills the tasks with SIGINT (nice because psql cancels the current query before exiting), then waits a few seconds, kills all otherchildren with SIGINT and then raises a SIGINT so that the main run_pipeline loop also cleanly exits. It also makes sure that the signal handler in run_pipeline orchestrates this be making any other process forward the signal to the run_pipeline process. it also makes now sure that we close all open runs/node_runs thanks to the KeyboardInterrupt handler and atexit code.

I'm not really happy with the code as it become bigger with each missing piece we found, but it does cleanly shut down our system so that we now have at least some debugging output whats going on in case our nice and shiny k8s environment decides to kill us for some (still unknown) reason...

import signal
import os
import sys
import time
import atexit
import psutil

_ALL_SIGNALS = [signal.SIGTERM, signal.SIGINT, signal.SIGQUIT]


def _signal_children(signum: int, parent_pid: int = None, only_tasks=False):
    """Signals all children, starting with the youngest ones"""
    if parent_pid is None:
        parent_pid = os.getpid()
    parent = psutil.Process(parent_pid)
    processes = parent.children(recursive=True)
    for child in reversed(processes):
        if only_tasks:
            if _name_process(child.pid) in ("Main", "Executor"):
                # protect these two so events are handled
                continue

        try:
            os.kill(child.pid, signum)
        except :
            # e.g. when the process is already gone as we already killed the child of it
            pass


def _log(msg: str):
    print(msg, flush=True, file=sys.stderr)


def _name_process(pid: int):
    """Names the process of the pid"""
    # The idea is that we start with a main process (run_pipeline), which starts the Executor (run())
    # which starts the system stats process and the individual tasks. All of these python processes share the same
    # "name" (the commandline). Within the tasks it's different and the process parent of the main process is different.
    try:
        p = psutil.Process(pid)
        p_name = p.name()
    except psutil.NoSuchProcess:
        # process died already...
        return
    python_name = psutil.Process(os.getpid()).name()

    name_for_level = {
        1: "Main",
        2: "Executor",
        3: "Task/SystemStats",
        4: "Task Child"
    }
    if p_name != python_name:
        # this assumes we only get children as pid arg!
        return name_for_level[4]

    for level in [1, 2, 3]:
        if p.parent().name() != p_name:
            return name_for_level[level]
        p = p.parent()
    # this can happen for python function tasks which open a new process
    return name_for_level[4]


def install_gracefully_shutdown_signal_handler():
    # gracefully shutting down:
    # by installing this signal handler, all children also get it. This goes down to the process which runs
    # run_pipeline, the executor started within that function and the TaskProcess started by the executor.
    # Installing a signal handler in the webserver turned out to be totally unreliable which is why we only install
    # this on a best effort base.
    # if the "highest parent" (which in a normal exec is the process which runs run_pipeline) get a SIGTERM/SIGINT:
    #  - set 'shutdown_flag' (and return if it is already set to not do the work twice)
    #  - signal SIGINT to all tasks
    #  - schedule a SIGINT in 2 sec for all children (so including the executor)
    #  - schedule a SIGKILL in 10 sec for all children and raise a KeyboardInterupt() to shutdown the run_pipeline()
    #  - return to let the run_pipeline receive the events for all the killed children
    # if any children TaskProcess or the stats process gets a SIGTERM/SIGINT:
    #  - set a "shutdown_flag" (and return if it is already set to not do the work twice)
    #  - signal SIGINT to the main process (which should be the executor)
    #  - return to let the TaskProcess and the executor send the right events
    installing_process_pid = os.getpid()

    def _SIGKILL_all_children_handler(signum, frame):
        _log(f"Signaling SIGKILL to all children (PID: {os.getpid()})")
        _signal_children(signal.SIGKILL)
        raise KeyboardInterrupt()

    def _SIGINT_all_children_handler(signum, frame):
        _log(f"Signaling SIGINT to all children (PID: {os.getpid()})")
        _signal_children(signal.SIGINT)
        _log(f"Scheduling SIGKILL to all children in 10 sec (PID: {os.getpid()})")
        signal.signal(signal.SIGALRM, _SIGKILL_all_children_handler)
        signal.alarm(10)

    def _main_signal_handler(signum, frame):
        """kills children and signals the parent (up to the executor) to shutdown"""
        if hasattr(_main_signal_handler, 'shutdown_flag'):
            # already did the work
            return

        _main_signal_handler.shutdown_flag = True
        _sig_name = signal.Signals(signum).name
        _p_name = _name_process(os.getpid())
        _log(f"Received shutdown signal {_sig_name} in {_p_name} process (PID: {os.getpid()})")
        if installing_process_pid != os.getpid():
            # we are in a child -> bubble up the signal to the parent and the
            # main pid (which should be the executor itself)
            os.kill(installing_process_pid, signal.SIGINT)
            # no other action to not die in the executor where events are generated and the TaskProcess
            # both will die naturally when all children have died
        else:
            # we are in the main process, usually whatever runs run_pipeline
            _log(f"Shutdown all tasks (PID: {os.getpid()})")
            _signal_children(signal.SIGINT, installing_process_pid, only_tasks=True)
            # send another shutdown signal after 1 second to make sure that any newly started child processes
            # also get killed. this happens because we cannot prevent the executor from starting new ones unless
            # the executor already has seen an error from the same pipeline. It would be nicer if we could do it
            # directly in the executor...
            # While we sleep, no event handling happens!
            time.sleep(1)
            _signal_children(signal.SIGINT, installing_process_pid, only_tasks=True)
            _log(f"Scheduling SIGNINT to all children in 3 sec (PID: {os.getpid()})")
            signal.signal(signal.SIGALRM, _SIGINT_all_children_handler)
            signal.alarm(3)
        # return to let the process receive/send events -> if nothing is there anymore, the process dies in the end...
        return

    def cancel_alarm():
        # if we are about to shutdown because all processes already exited without a SIGKILL, we have to disable
        # the alarm which would sent this SIGKILL, otherwise we get a bad exit
        signal.alarm(0)

    # on fork, the signal handler stays on children unless overwritten
    try:
        for sig_no in _ALL_SIGNALS:
            signal.signal(sig_no, _main_signal_handler)
        atexit.register(cancel_alarm)
        _log("Installed signal handler to gracefully shutdown the ETL")
    except ValueError:
        # we are in a webserver, you are on your own...
        _log("Not in main thread, not adding signal handler")

@jankatins
Copy link
Member Author

jankatins commented Jun 11, 2020

There's a unfortunately a third place:

Hm, i thought i had that covered by the if instanceof() in the print?

The stacktrace doesn't really make sense, it looks liek the exception is raised during the handling of the first exception (do press ctrl+c twice?) but that one should still end up in a single line of Interrupted and not a whole stack trace.

@jankatins jankatins force-pushed the no_more_keyboard_interrupt_stack_traces branch from 1f736a5 to 38eb8bb Compare July 3, 2020 12:32
@martin-loetzsch
Copy link
Member

@jankatins are you running that in production?

The previous changes you made around cancelling runs really improved things a lot, but here I'm afraid we add a lot of complexity for little gains

@jankatins
Copy link
Member Author

The two commits in this PR only do not print stacktraces in two places and were only to clean up such stacktraces as requested in #42. The change looks big because of the indention (wrapping something in try: ... except...)? We are not running this PR in production, though.

The code example for the signal handler here on the other hand is run in production (installed in root_pipeline() and needs to be there if the pipeline runs happen in a setting where they can be killed at any time (k8s cluster which moves pods around or simply shuts down during upgrades) to be able to close as many runs as possible (SIGKILL is of course another matter). We use open runs as a lock to not run an ETL in parallel (via a extended run command) and in prometheus metrics ("what is currently running" and stuff like "how many failures" or alerts when a pipelien run takes more than 50% more than the avg in the last 2 weeks). So "not closing" the runs broke stuff...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Don't show stack traces on Ctrl+c
2 participants