Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Documentation not clear on monitor socket limitations #1554

Open
Jeducious opened this issue Jun 25, 2021 · 4 comments
Open

Documentation not clear on monitor socket limitations #1554

Jeducious opened this issue Jun 25, 2021 · 4 comments

Comments

@Jeducious
Copy link

Referring to get_monitor_socket(events = [], addr=None) which is documented here

Problem

I recently ran into an issue whilst creating an automated test. I wanted to simulate the Majordomo Protocol being used with a pool of about 25 clients and 50 workers.

Most of the test runs failed, the error coming from socket.get_monitor_socket(). The error was that the "Address was in use".

Setup

I had created a context class to manage handling the setup of a client and worker, it contained boiler plate code for connecting zmq sockets to the broker.

I use monitor sockets to ensure that the sockets I connect to the broker have finished their handshake and are ready to start sending/receiving data. I simply wait for the 'connected' event and then return from an asyncio method, allowing the calling client to begin requesting.

The automated test uses asyncio tasks, one for each client and one for each worker. Each client and worker gets one monitor socket as a result so the respective client/worker can ensure the socket connection to the broker is a success before proceeding further.

Method to resolve the problem

The method get_monitor_sockets(events = [], addr=None) is in zmq.sugar.socket.py , note lines 845 to 847

        if addr is None:
            # create endpoint name from internal fd
            addr = "inproc://monitor.s-%d" % self.FD

I noticed that self.FD was used to distinguish the endpoints for each monitor socket instance. I guessed that this might refer to File Descriptors, and presumed that I might had run out of file descriptors (although there were only really 75 ish sockets needed for this test, plus monitor sockets, one for each client/worker socket).

So I made a small change to my code, instead of leaving the addr argument blank, I populated it using uuid.uuid4() to ensure each endpoint was unique.

This suggests something a bit odd though. If I was truly running out of file descriptors, then I would expect the test to still fail, but it does not. So I may be wrong about the meaning of self.FD. I did a test to examine how many fd were indeed open. The numbers closely, but not exactly, matched those of self.FD when printed to console.

Impact on pyzmq consumers

In any case, the documentation gives no hint that you can run out of monitor sockets if you don't specify the addr argument. I admit that my use case is a bit extreme, but, I am building something i hope will be extreme and have to test it. Profiling performance will be a big part of tuning the system and making sure its fit for purpose. Therefore being able to simulate lots of clients and workers with just one physical system is important for development.

And that's where I feel this issue really becomes relevant, it impacts people trying to do ambitious projects.

Requested changes (and I am happy to fork your documentation source and issue a PR if you like)

I think it would be good to add a note to the current documentation to alert devs to the fact that there is an upper limit on the number of sockets available if no addr argument is given.

@minrk
Copy link
Member

minrk commented Jun 25, 2021

Yes, absolutely! It is appropriate to add to the docstring that if addr is unspecified, a unique per-socket address will be used (that's what the FD is used for - it's a unique integer across currently open sockets, which is all we really need), meaning that it can be called at most once per socket without specifying addr. If you'd like to make a PR clarifying that, it would be most welcome.

It is quite unusual to connect multiple monitors to one socket, which should be the only way this error can occur. Is there a reason you are using multiple simultaneous monitors on one socket? If you're not doing that, can you share an example to illustrate the problem?

@Jeducious
Copy link
Author

Jeducious commented Jun 25, 2021

Hi @minrk, I am almost certain that these are separate instances of the pyzmq socket class.

I have a class that I create for each client which then does;

async def connect(self):
        
        logger.info("Creating a zmq_socket to connect to broker")
        
        self.zmq_socket = self.z_cont.socket(self.SOCK_TYPE)
        
        
        logger.info("Creating a monitor zmq_socket to listen for connect event")
        #monitor for connect
        mon_sock = self.zmq_socket.get_monitor_socket()

Each of the instances of the class has a new instance of a zmq.DEALER socket which is where I call the get_monitor_socket.

So as far as I can see I am making a new zmq socket and asking for a mon sock.

What are your thoughts?

@minrk
Copy link
Member

minrk commented Jun 25, 2021

Looks like it's a failure to close the monitor sockets, which are not closed implicitly if you close the socket itself (because then your monitor couldn't get close events!)

To fully close a monitored socket and its monitors, you must do:

socket.disable_monitor()
mon_socket.close()
socket.close()

This is absolutely not obvious (disable_monitor must be before mon_socket.close()), and should be clearly documented.

Here's a complete example that can create thousands of sockets, where it would fail at 30-50 before:

import asyncio

import tqdm
import zmq
import zmq.asyncio

class Thing:
    def __init__(self):
        self.z_cont = zmq.asyncio.Context.instance()
        self.SOCK_TYPE = zmq.PUB

    async def connect(self):

        self.zmq_socket = self.z_cont.socket(self.SOCK_TYPE)

        #monitor for connect
        self.mon_sock = self.zmq_socket.get_monitor_socket()

    def close(self):
        self.zmq_socket.disable_monitor()
        self.mon_sock.close()
        self.zmq_socket.close()

async def main():
    for i in tqdm.tqdm(range(5000)):
        t = Thing()
        await t.connect()
        t.close()
        # close is async, so it takes a finite amount of time to free FDs
        await asyncio.sleep(1e-3)

if __name__ == "__main__":
    asyncio.run(main())

@Jeducious
Copy link
Author

Daaaammnnn!! Ok that makes sense, and yes, let's definitely update the docs then. I mean, I was pretty sloppy to be honest, I know I wasn't living up to good housekeeping with explicitly closing sockets. So this has reinforced that lesson. But it would certainly not be obvious at all for others too.

And your example is spookily on point for the failure point. I was at about 25 on the client side, where it failed. I'm going to implement what you just showed me. Heck I should have done this in the async tear down func for python unit tests that I am using.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants