Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: PUBHandler is not thread-safe when using with async Context #1967

Open
1 task done
rmorshea opened this issue Mar 29, 2024 · 6 comments
Open
1 task done

BUG: PUBHandler is not thread-safe when using with async Context #1967

rmorshea opened this issue Mar 29, 2024 · 6 comments

Comments

@rmorshea
Copy link

rmorshea commented Mar 29, 2024

This is a pyzmq bug

  • This is a pyzmq-specific bug, not an issue of zmq socket behavior. Don't worry if you're not sure! We'll figure it out together.

What pyzmq version?

22.3.0

What libzmq version?

4.3.4

Python version (and how it was installed)

Python 3.10 via conda-forge

OS

macOS 14

What happened?

If you attempt to send a message from a different thread that doesn't have an event loop running when using the zqm.asyncio.Context you'll get an error complain about missing event loop because Socket._Future() requests access to the currently running event loop by default.

In most cases this would be user error, but in the context of logging, it's quite hard to avoid. For example, when interacting with sync library code it's quite common to use asyncio.to_thread to prevent blocking calls. If the library you're calling in a thread then logs, you'll get an error because the thread spawned to do the work doesn't have an event loop.

This has came up in several different ways for me. First when using ddtrace (a Datadog client library) and then later in my own library code.

Code to reproduce bug

import asyncio
import logging

import zmq
from zmq.asyncio import Context, Socket
from zmq.log.handlers import PUBHandler

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def make_sockets(ctx: Context) -> tuple[Socket, Socket]:
    addr = 'tcp://127.0.0.1'

    first = ctx.socket(zmq.PAIR)
    first.linger = 0
    port = first.bind_to_random_port(addr)

    second = ctx.socket(zmq.PAIR)
    second.linger = 0
    second.connect(f'{addr}:{port}')

    return first, second


async def print_messages(sub: Socket, stop: asyncio.Event):
    while True:
        msg_task = sub.recv_multipart()
        stop_task = asyncio.create_task(stop.wait())
        done, _ = await asyncio.wait({msg_task, stop_task}, return_when=asyncio.FIRST_COMPLETED)
        if stop_task in done:
            break
        print(await msg_task)


async def main():
    with Context() as ctx:
        pub, sub = make_sockets(ctx)

        # backgound task to read sent message
        stop_printing = asyncio.Event()
        print_task = asyncio.create_task(print_messages(sub, stop_printing))

        # create some logs
        logger.addHandler(PUBHandler(pub))
        logger.info("hello")  # log outside thread
        await asyncio.to_thread(logger.info, "world")  # log in thread

        # wait for messages to tbe sent
        await asyncio.sleep(1)

        # stop the background task
        stop_printing.set()
        await print_task


if __name__ == "__main__":
    asyncio.run(main())

Traceback, if applicable

Traceback (most recent call last):
  File "main.py", line 58, in <module>
    asyncio.run(main())
  File "...venv/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "...venv/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "...venv.py", line 47, in main
    await asyncio.to_thread(logger.info, "world")  # log in thread
  File "...venv/lib/python3.10/asyncio/threads.py", line 25, in to_thread
    return await loop.run_in_executor(None, func_call)
  File "...venv/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "...venv/lib/python3.10/logging/__init__.py", line 1477, in info
    self._log(INFO, msg, args, **kwargs)
  File "...venv/lib/python3.10/logging/__init__.py", line 1624, in _log
    self.handle(record)
  File "...venv/lib/python3.10/logging/__init__.py", line 1634, in handle
    self.callHandlers(record)
  File "...venv/lib/python3.10/logging/__init__.py", line 1696, in callHandlers
    hdlr.handle(record)
  File "...venv/lib/python3.10/logging/__init__.py", line 968, in handle
    self.emit(record)
  File "...venv/lib/python3.10/site-packages/zmq/log/handlers.py", line 186, in emit
    self.socket.send_multipart([btopic, bmsg])
  File "...venv/lib/python3.10/site-packages/zmq/_future.py", line 321, in send_multipart
    return self._add_send_event('send_multipart', msg=msg_parts, kwargs=kwargs)
  File "...venv/lib/python3.10/site-packages/zmq/_future.py", line 509, in _add_send_event
    f = future or self._Future()
  File "...venv/lib/python3.10/asyncio/events.py", line 656, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'asyncio_0'.

More info

I'm currently using the following as a workaround:

import asyncio
import logging
from typing import Any

from zmq.log.handlers import PUBHandler as _PUBHandler


class PUBHandler(_PUBHandler):

    def __init__(self, *args: Any, loop: asyncio.AbstractEventLoop = None, **kwargs: Any) -> None:
        self.event_loop = loop or _try_get_running_loop()
        super().__init__(*args, **kwargs)

    def emit(self, record: logging.LogRecord) -> None:
        if self.event_loop is None:
            super().emit(record)
        else:
            self.event_loop.call_soon_threadsafe(super().emit, record)


def _try_get_running_loop() -> asyncio.AbstractEventLoop | None:
    try:
        return asyncio.get_running_loop()
    except RuntimeError:
        return None
@minrk
Copy link
Member

minrk commented Mar 31, 2024

I don't think PUBHandler should accept async sockets, so the fix is probably to cast async sockets to sync sockets if they are given. The workaround is to do it yourself before passing to PUBHandler, I think.

@rmorshea
Copy link
Author

rmorshea commented Apr 1, 2024

Why shouldn't the PUBHandler accept an async socket? It seems like it would be valuable to avoid blocking calls to the socket whenever you hit log statements.

@minrk
Copy link
Member

minrk commented Apr 2, 2024

Why shouldn't the PUBHandler accept an async socket?

For one, PUB sockets effectively never block anyway. If they are backed up, they drop messages instead of blocking. Plus, in general, zmq sockets (at the libzmq level) are not threadsafe, so I think it is not actually safe to use PUBHandler with any logger that may be used concurrently from multiple threads with any kind of socket. In that way, I suppose using call_soon_threadsafe might actually be safer than a sync socket. But then you'd need to be sure you are connecting to the right event loop, and I don't think PUBHandler has enough information to do that (e.g. consecutive calls to asyncio.run which are in one thread, but two event loops).

Cases to consider:

  • __init__ called when no event loop is running, but emit called when it is (common in application setup for log handlers)
  • multiple calls to asyncio.run, each of which create and destroy an event loop
  • multi-threaded applications (zmq sockets are not threadsafe)

A separate AsyncPUBHandler that has declared semantics (i.e. attaches to the running loop at start and requires that the loop is running) could make sense, though, but I think it would cause problems in the first two cases to adopt that behavior in the base PUBHandler.

@rmorshea
Copy link
Author

rmorshea commented Apr 2, 2024

I think a separate AsyncPUBHandler makes a lot of sense.

One way to address the first concern might be to copy asyncio.mixins._LoopBoundMixin which implements the following in order to lazily determine the loop to use:

class _LoopBoundMixin:
    _loop = None

    def _get_loop(self):
        loop = events._get_running_loop()

        if self._loop is None:
            with _global_lock:
                if self._loop is None:
                    self._loop = loop
        if loop is not self._loop:
            raise RuntimeError(f'{self!r} is bound to a different event loop')
        return loop

However, I could see this being a bit challenging to reason about since it wouldn't be immediately obvious what loop the handler would be bound to. The only way to find out would be to read the code to determine what loop is active when the first log is produced.

As a compromise, perhaps a loop argument to AsyncPUBHandler could accept:

  • AbstractEventLoop - bind to the explicitly given loop
  • None - bind to the currently running loop (probably the default)
  • "lazy" - bind to the currently running loop when the first log is handled

@minrk
Copy link
Member

minrk commented Apr 3, 2024

That sounds reasonable. Want to have a go? I'm hoping to release pyzmq 26 soon, and imagine 26.1 will likely come pretty soon after with fixes for issues that get reported with the new build system.

@rmorshea
Copy link
Author

rmorshea commented Apr 5, 2024

Great. Will try and take a crack at it when I find time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants