Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SYNC PUB-SUB dropping messages? #358

Open
ocaballeror opened this issue May 30, 2020 · 4 comments
Open

SYNC PUB-SUB dropping messages? #358

ocaballeror opened this issue May 30, 2020 · 4 comments
Projects
Milestone

Comments

@ocaballeror
Copy link
Contributor

Continuing the discussion from #357

Consider the following code:

import time
from osbrain import run_nameserver, run_agent, Agent

def log_msg(agent, msg):
    agent.log_info('Received: %s' % msg)

class ListAgent(Agent):
    def on_init(self):
        self.received = []
    
    def append(self, msg):
        self.log_info('Received %s' % msg)
        self.received.append(msg)

    def idle(self):
        self.log_info('Nothing to process')

class MasterAgent(Agent):
    def on_init(self):
        self.both_received = False
        self.addr = self.bind("SYNC_PUB", alias='main', handler=log_msg)

    def start(self):
        agent1 = run_agent('Agent1', base=ListAgent)
        agent2 = run_agent('Agent2', base=ListAgent)
        agent1.connect(self.addr, handler="append")
        agent2.connect(self.addr, handler="append")
        self.send('main', 'Hello world')
        time.sleep(2)
        recv1 = bool(agent1.get_attr('received'))
        recv2 = bool(agent2.get_attr('received'))
        self.both_received = recv1 and recv2

if __name__ == "__main__":
    ns = run_nameserver()
    master = run_agent('Master', base=MasterAgent)
    master.start()
    try:
        assert master.get_attr('both_received')
    finally:
        ns.shutdown()

I tried extending the sleep time up to 10 seconds to give the message a little bit more time to be sent, but it didn't work, sometimes the subscriber doesn't ever get the message. Here are some weird things I noticed that I can't find an explanation for:

  • If the publisher sends a second message, the subscribers receive it just fine.
  • Removing the master agent class, and putting all that code directly in the "main" block solves the issue.
  • The subscriber missing the publication is always the second one to be created. Changing the order in which they connect doesn't seem to make a difference.
  • I've never run into a situation where none of the agents get the message. It is always one or both.

I am still confused 😕

@Peque
Copy link
Member

Peque commented May 30, 2020

Yeah, I do not think the sleep matters here. I think you should be able to reproduce it if you set sleep(0.01) too.

The fact that one of the subscribers may not be getting the first message is expected in a PUB-SUB pattern (asynchronous communication). The subscription is not guaranteed to happen immediately. So there is a chance the ListAgent is a little bit slow subscribing and, hence, misses the first published messages.

I think the fact that the second agent is always the one missing the message is because the first one had a little bit more time to subscribe, while the second has less time. But they both could be missing the first message, or even more. It is just about timing.

If my theory is correct, you should be unable to reproduce if you do something like:

time.sleep(0.1)
self.send('main', 'Hello world')
time.sleep(0.1)

Note the delay before sending the message. Probably time.sleep(0.01) is enough too.

Anyway, that should not be a reason to hang... That is what I am unable to reproduce. Can you confirm the code is hanging under Linux? If so, which distribution? Is it virtualized or is it WSL?

@Peque Peque added this to the 0.7.0 milestone May 30, 2020
@ocaballeror
Copy link
Contributor Author

I understand it's an asynchronous communication pattern and you may not receive all the messages, but don't you think the above code should work? I would expect the call to .connect() to only return once the agent is connected to the publisher channel and ready to receive messages.

In any case, having to insert manual sleeps after connecting only to make sure you get the messages looks weird. Maybe there should be some kind of mechanism inside to ensure the agent is actually listening, don't you think?

@ocaballeror
Copy link
Contributor Author

By the way, this code never hangs, sorry for the misunderstanding, I was referring to the example from #357 instead. In that case, the parent agent would wait for both of its children to respond, but since one of them didn't get the first message, it would never do it and the parent would wait forever.

Maybe "hangs" is a confusing word, should've said it runs into a deadlock or something.

@Peque
Copy link
Member

Peque commented Jun 1, 2020

@ocaballeror Yeah, the subscription process is asynchronous too. That means, ZMQ will execute the subscription asynchronously when you .connect().

We could add some code in osBrain, but I think that may be a bit application-specific.

Although with the SYNC_PUB pattern it may be not that hard:

  • From the client, we send a "ping" through SYNC_PUB, in a loop
  • From the server, we receive the "ping" and reply with a "pong"
  • When the client receives the "pong", it is ready and guaranteed to be receiving messages from the PUB socket

We could, at least, add an example and a note in the documentation explaining how to deal with this. If it is clean and general enough we could consider adding it to the base osBrain code too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Development
Awaiting triage
Development

No branches or pull requests

2 participants