Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAproto asyncio client going into endless loop on teardown when used with pytest #847

Open
codedump opened this issue Apr 22, 2024 · 1 comment

Comments

@codedump
Copy link

Hello,

here is a bug in the caproto.asyncio infrastructure.

Consider the following pytest unit test, based on caproto:

import asyncio, pytest

from caproto.asyncio.client import Context as ClientContext
from caproto.asyncio.server import Context as ServerContext
from caproto.server import PVGroup, pvproperty

from caproto.sync.client import read as ca_read

import multiprocessing as mp
#mp.set_start_method("spawn")

class Server(PVGroup):
    foo = pvproperty(value=3.14)

async def run_ioc():
    srv = Server(prefix="catest:")
    print(f'PVs: {srv.pvdb}')
    ctx = ServerContext(srv.pvdb)
    await ctx.run()

def run_server():
    asyncio.run(run_ioc())


@pytest.mark.asyncio
async def test_foo():

    p = mp.Process(target=run_server)
    p.start()

    await asyncio.sleep(1.0)    

    if False:  #True:
        ctx = ClientContext()
        foo, = await ctx.get_pvs('catest:foo')
        d = await foo.read()
    else:
        d = ca_read('catest:foo').data[0]
    
    print(f'Read: {d}')

    p.kill()
    p.join()

As it is (i.e. spawning an asyncio IOC with the PV "catest:foo" and reading it with the synchronous client), the test behaves as it should:

$ pytest -s ./ca-test.py                                                                                                      
============================================================= test session starts =============================================================     
platform linux -- Python 3.12.2, pytest-8.1.1, pluggy-1.4.0                                                                                         
rootdir: /var/home/florin/tmp/ca-test                                                                                                               
plugins: cov-5.0.0, asyncio-0.23.6                                                                                                                  
asyncio: mode=Mode.STRICT                                                                                                                           
collected 1 item                                                                                                                                    
                                                                                                                                                    
ca-test.py PVs: OrderedDict({'catest:foo': <caproto.server.server.PvpropertyDouble object at 0x7f11995e6690>})                                      
Read: 3.14                                                                                                                                          
.                                                                                                                                                   

============================================================== warnings summary ===============================================================     
../../projects/udkm/caproto/caproto/_constants.py:11                      
  /var/home/florin/projects/udkm/caproto/caproto/_constants.py:11: DeprecationWarning: datetime.datetime.utcfromtimestamp() is deprecated and schedu
led for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.fromtimestamp(timestamp, datetime.U
TC).                                 
    EPICS_EPOCH = datetime.datetime.utcfromtimestamp(EPICS2UNIX_EPOCH)    

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html                                                                             
======================================================== 1 passed, 1 warning in 1.18s =========================================================

(There are a number of deprecation warnings as to API elements of more recent Python versions, but that isn't an issue -- yet).

Now if you change the line that says if False: to if True:, i.e. deactivating the synchronous client and building an asyncio client context instead, eventually this happens:

[...]
Circuit command evaluation failed: ReadNotifyResponse(data=array([3.14]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name=
'ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'),
 ioid=0, metadata=None)                                                                                                                             
Traceback (most recent call last):                                                                                                                  
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py", line 1215, in _command_queue_loop                                        
    command = await self.command_queue.async_get()                                                                                                  
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                  
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get                                                     
    return await self._queue.get()                                                                                                                  
           ^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                  
  File "/usr/lib64/python3.12/asyncio/queues.py", line 155, in get                                                                                  
    getter = self._get_loop().create_future()                                                                                                       
             ^^^^^^^^^^^^^^^^                                                                                                                       
  File "/usr/lib64/python3.12/asyncio/mixins.py", line 20, in _get_loop                                                                             
    raise RuntimeError(f'{self!r} is bound to a different event loop')                                                                              
RuntimeError: <Queue at 0x7f1ca7beb5f0 maxsize=0 _getters[1] tasks=4> is bound to a different event loop
[...]

This is repeated over and over again, in an endless loop.

It isn't difficult to trace this to line caproto/asyncio/client.py:1225, and explicitly catching a RuntimeError and breaking the loop actually kind-of fixes the issue. But then this happens:

Exception ignored in: <coroutine object Context._process_search_results_loop at 0x7fe6ef3ff8b0>                                                     
Traceback (most recent call last):                                                                                                                  
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py", line 812, in _process_search_results_loop                                
    address, names = await self._search_results_queue.async_get()                                                                                   
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                   
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get                                                     
    return await self._queue.get()                                                                                                                  
           ^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                  
  File "/usr/lib64/python3.12/asyncio/queues.py", line 160, in get                                                                                  
    getter.cancel()  # Just in case getter is not done yet.                                                                                         
    ^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 793, in call_soon
    self._check_closed()
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 540, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending! 
task: <Task pending name='Task-2' coro=<Context._process_search_results_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py:807> wait_for=<Future cancelled> cb=[_TaskHandler._remove_completed_task()]>
Task was destroyed but it is pending! 
task: <Task pending name='Task-3' coro=<Context._activate_subscriptions_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py:789> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_TaskHandler._remove_completed_task()]>
Task was destroyed but it is pending! 
task: <Task pending name='Task-4' coro=<SharedBroadcaster._broadcaster_retry_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py:446> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_TaskHandler._remove_completed_task()]>
Exception ignored in: <coroutine object SharedBroadcaster._broadcaster_receive_loop at 0x7fe6ee72dea0>
Traceback (most recent call last):
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py", line 217, in _broadcaster_receive_loop
    _, bytes_received, address = await self.receive_queue.async_get()
                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get
    return await self._queue.get()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/asyncio/queues.py", line 160, in get
    getter.cancel()  # Just in case getter is not done yet.
    ^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 793, in call_soon
    self._check_closed()
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 540, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending! 
task: <Task pending name='Task-5' coro=<SharedBroadcaster._broadcaster_receive_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py:213> wait_for=<Future cancelled> cb=[_TaskHandler._remove_completed_task()]>
Task was destroyed but it is pending! 
task: <Task pending name='Task-6' coro=<SharedBroadcaster._check_for_unresponsive_servers_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py:363> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_TaskHandler._remove_completed_task()]>
Exception ignored in: <coroutine object _CallbackExecutor._callback_loop at 0x7fe6ee6fb450>
Traceback (most recent call last):
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 222, in _callback_loop
    callback, args, kwargs = await self.callbacks.async_get()
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get
    return await self._queue.get()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/asyncio/queues.py", line 160, in get
    getter.cancel()  # Just in case getter is not done yet.
    ^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 793, in call_soon
    self._check_closed()
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 540, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending name='Task-12' coro=<_CallbackExecutor._callback_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py:214> wait_for=<Future cancelled> cb=[_TaskHandler._remove_completed_task()]>
Task was destroyed but it is pending!
task: <Task pending name='Task-20' coro=<VirtualCircuitManager._transport_receive_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py:977> wait_for=<Future finished result=None> cb=[_TaskHandler._remove_completed_task()]>
Task was destroyed but it is pending!
task: <Task pending name='Task-21' coro=<VirtualCircuitManager._command_queue_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py:1211> wait_for=<Future cancelled> cb=[_TaskHandler._remove_completed_task()]>

I.e. a bunch of errors come up, to my understanding related to the fact that there are still asyncio tasks running that haven't been properly shut down.

The unit test does what it should, though (output is similarly to the synchronous case) and concludes as passing. It's just the giant error message that's messy.

Adding a ctx.disconnect(), so that the relevant part of the test reads like this:

    [...]
    if True:
        ctx = ClientContext()
        foo, = await ctx.get_pvs('catest:foo')
        d = await foo.read()
        await ctx.disconnect()
    [...]

Then the output of the pytest run looks more sane:

[...]
Task was destroyed but it is pending!
task: <Task pending name='Task-21' coro=<VirtualCircuitManager._command_queue_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py:1211> wait_for=<Future finished result=None> cb=[_TaskHandler._remove_completed_task()]>

This happens both with the patched version of CAproto (i.e. the one with my ad-hoc "...except RuntimeError" modification) and with pristine CAproto from today's checkout.

I'm not sure what the correct behavior (from caproto or from the programmer :-) ) would be. I think I can identify at least the following issues:

  • The continue statement in asyncio/client.py, which makes the loop go on on RuntimeError -- it behaves correctly within a regular application (e.g. a caproto client application, or a caproto IOC which also monitors some other PV, thus also serving as an async client); it doesn't crash. But it crashes in a pytest. I think that a program that runs "successfully" should also pass a pytest.
  • Is await-ing a ctx.disonnect() on a client an official requirement now? Or not?
  • The error message when a ctx.disconnect() was issued (Task was destroyed but it is pending! error when using with pytest and a ctx.disconenct()) feels like it shouldn't be there. There's likely a .cancel() to be issued, and an asyncio.CancelledError to be caught in there somewhere.
  • What would be the side effects of the ...except RuntimeError in the main asyncio client loop? Obviously it would outright crash more often, but... is there a known case where this kind of crash is not desired? Is there a reason why every other error is accepted and continue'd upon, or is it just a temporary development measure trying to figure out what could go wrong?

I can invest a bit of time in trying to properly fix this, but I need a bit of insight as to the inventor's intention here :-) Does anyone (@tacaswell?) have any pointers as to how to unwind this (except for "make sure your applications call ctx.disconnect()") on the CAproto side? Is just looking for the rogue task, starting in VirtualCircuitManager._command_queue_loop(), essentially all that should be done?

Thanks & Cheers,
F.

codedump pushed a commit to codedump/caproto that referenced this issue Apr 22, 2024
@codedump
Copy link
Author

I've uploaded my patch here (please let me know if you'd like this handled differently).

The full error message when the pytest fails is this (previously impossible to see because of the endless loop):

pytest -s ./ca-test.py                                                                                                      
=============================================================== test session starts ================================================================
platform linux -- Python 3.12.2, pytest-8.1.1, pluggy-1.4.0                                                                                         
rootdir: /var/home/florin/tmp/ca-test                                                                                                               
plugins: cov-5.0.0, asyncio-0.23.6                                                                                                                  
asyncio: mode=Mode.STRICT                                                                                                                           
collected 1 item                                                                                                                                    
                                                                                                                                                    
ca-test.py PVs: OrderedDict({'catest:foo': <caproto.server.server.PvpropertyDouble object at 0x7f20f32cb6b0>})                                      
Read: ReadNotifyResponse(data=array([3.14]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_wi
th_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=0, metadata=None)      
.                                                                                                                                                   
                                                                                                                                                    
================================================================= warnings summary =================================================================
../../projects/udkm/caproto/caproto/_constants.py:11                                                                                                
  /var/home/florin/projects/udkm/caproto/caproto/_constants.py:11: DeprecationWarning: datetime.datetime.utcfromtimestamp() is deprecated and schedu
led for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.fromtimestamp(timestamp, datetime.U
TC).                                                                                                                                                
    EPICS_EPOCH = datetime.datetime.utcfromtimestamp(EPICS2UNIX_EPOCH)                                                                              
                                                                                                                                                    
-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html                                                                             
=========================================================== 1 passed, 1 warning in 1.17s ===========================================================
Exception ignored in: <coroutine object Context._process_search_results_loop at 0x7f1fee7ef8b0>                                                     
Traceback (most recent call last):                                                                                                                  
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py", line 813, in _process_search_results_loop                                
    address, names = await self._search_results_queue.async_get()                                                                                   
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                   
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get                                                     
    return await self._queue.get()                                                                                                                  
           ^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                  
  File "/usr/lib64/python3.12/asyncio/queues.py", line 160, in get                                                                                  
    getter.cancel()  # Just in case getter is not done yet.                                                                                         
    ^^^^^^^^^^^^^^^                                                                                                                                 
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 793, in call_soon                                                                       
    self._check_closed()                                                                                                                            
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 540, in _check_closed                                                                   
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed                                                                                                                  
Task was destroyed but it is pending!                                                                                                               
task: <Task pending name='Task-2' coro=<Context._process_search_results_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/async
io/client.py:808> wait_for=<Future cancelled> cb=[_TaskHandler._remove_completed_task()]>                                                           
Task was destroyed but it is pending!                                                                                                               
task: <Task pending name='Task-3' coro=<Context._activate_subscriptions_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/async
io/client.py:790> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_TaskHandler._remove_completed_task()]>                                     
Task was destroyed but it is pending!                                                                                                               
task: <Task pending name='Task-4' coro=<SharedBroadcaster._broadcaster_retry_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/
asyncio/client.py:447> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_TaskHandler._remove_completed_task()]>                                
Exception ignored in: <coroutine object SharedBroadcaster._broadcaster_receive_loop at 0x7f1fedd1dea0>                                              
Traceback (most recent call last):                                                                                                                  
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py", line 218, in _broadcaster_receive_loop                                   
    _, bytes_received, address = await self.receive_queue.async_get()                                                                               
                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get
    return await self._queue.get()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/asyncio/queues.py", line 160, in get
    getter.cancel()  # Just in case getter is not done yet.
    ^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 793, in call_soon
    self._check_closed()
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 540, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending! 
task: <Task pending name='Task-5' coro=<SharedBroadcaster._broadcaster_receive_loop() done, defined at /var/home/florin/projects/udkm/caproto/caprot
o/asyncio/client.py:214> wait_for=<Future cancelled> cb=[_TaskHandler._remove_completed_task()]>
Task was destroyed but it is pending! 
task: <Task pending name='Task-6' coro=<SharedBroadcaster._check_for_unresponsive_servers_loop() done, defined at /var/home/florin/projects/udkm/cap
roto/caproto/asyncio/client.py:364> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_TaskHandler._remove_completed_task()]>
Exception ignored in: <coroutine object _CallbackExecutor._callback_loop at 0x7f1fedaeb560>
Traceback (most recent call last):
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 222, in _callback_loop
    callback, args, kwargs = await self.callbacks.async_get()
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get
    return await self._queue.get()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/asyncio/queues.py", line 160, in get
    getter.cancel()  # Just in case getter is not done yet.
    ^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 793, in call_soon
    self._check_closed()
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 540, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending! 
task: <Task pending name='Task-12' coro=<_CallbackExecutor._callback_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/
utils.py:214> wait_for=<Future cancelled> cb=[_TaskHandler._remove_completed_task()]>
Task was destroyed but it is pending! 
task: <Task pending name='Task-20' coro=<VirtualCircuitManager._transport_receive_loop() done, defined at /var/home/florin/projects/udkm/caproto/cap
roto/asyncio/client.py:978> wait_for=<Future finished result=None> cb=[_TaskHandler._remove_completed_task()]>
Probably an asyncio error: Event loop is closed. Traceback: Traceback (most recent call last):
  File "/usr/lib64/python3.12/asyncio/queues.py", line 158, in get
    await getter
GeneratorExit

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py", line 1216, in _command_queue_loop
    command = await self.command_queue.async_get()
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get
    return await self._queue.get()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/asyncio/queues.py", line 160, in get
    getter.cancel()  # Just in case getter is not done yet.
    ^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 793, in call_soon
    self._check_closed()
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 540, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Traceback (most recent call last):
  File "/usr/lib64/python3.12/asyncio/queues.py", line 158, in get
    await getter
GeneratorExit

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py", line 1216, in _command_queue_loop
    command = await self.command_queue.async_get()
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get
    return await self._queue.get()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/asyncio/queues.py", line 160, in get
    getter.cancel()  # Just in case getter is not done yet.
    ^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 793, in call_soon
    self._check_closed()
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 540, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending! 
task: <Task pending name='Task-21' coro=<VirtualCircuitManager._command_queue_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto
/asyncio/client.py:1212> wait_for=<Future cancelled> cb=[_TaskHandler._remove_completed_task()]>

Cheers,
F.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant