Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception while processing command leave IOThread in bad state (Not invoking inEvent to serve new command) #905

Open
WeiliangLuo opened this issue Dec 3, 2021 · 6 comments

Comments

@WeiliangLuo
Copy link

It is not obvious, but I recently find out that the IOThread has to drain all commands in its mailbox in order to be signaled again for new commands. This is because draining the messages off underlying YPipe will reset the YPipe.c to -1, which cause "flush()" to return false on sender, and only then the sender of new command will signal the IOThread to invoke inEvent().

However, an exception while IOThread is processing the command will stop it from draining its mailbox. As a result, the IOThread will be left in running state but never waked up again to serve inEvent even though there are new commands sent to it.

The exception we hit is the same as the one mentioned in #871.
In our case, we understand that it is caused by a race condition between worker thread notifying the IOThread to ActivateWrite on a backed up socket and the heartbeat reconnecting to the remote end. This results the ActivateWrite command arrives to a new StreamEngine object just set up for the new connection, which is not fully initialized pending handshake.
We learnt that it is not a good idea to enable heartbeat on Pull socket as the back pressure will trigger the heartbeat to reconnect frequently (given a reasonably low heartbeat timeout).

I see few different ways to handle the exception better, for example, let the IOThread die instead of fail silently, handle the exception inside the inEvent or even call iothread.inEvent without the signal.
I am curious on your thoughts on this.

Thanks!

@fbacchella
Copy link
Contributor

Do you think this commit might help you ? ece1430

@fbacchella
Copy link
Contributor

One more question. Are you able to reproduce it ? A NPE is always a bug. So it should be resolver too.

@ben-ng
Copy link

ben-ng commented Dec 6, 2021

We are able to reproduce it. I'm not sure if it's easy to write a test for it yet, since it's a race condition, but I'm working on that this week.

@fbacchella
Copy link
Contributor

Meanwhile, a simple patch like:

@Override
public void inEvent()
{
    //  TODO: Do we want to limit number of commands I/O thread can
    //  process in a single go?

    while (true) {
        try {
            //  Get the next command. If there is none, exit.
            Command cmd = mailbox.recv(0);
            if (cmd == null) {
                break;
            }

            //  Process the command.
            cmd.process();
        } catch (RuntimeException e) {
            getCtx().getNotificationExceptionHandler().uncaughtException(Thread.currentThread(), e);
        }
    }
}

could make the job.

@WeiliangLuo
Copy link
Author

WeiliangLuo commented Dec 7, 2021

One more question. Are you able to reproduce it ? A NPE is always a bug. So it should be resolver too.

Here is a "Unittest" that will reproduce the issue:
The cause is essentially the race condition I mentioned above

In our case, we understand that it is caused by a race condition between worker thread notifying the IOThread to ActivateWrite on a backed up socket and the heartbeat reconnecting to the remote end. This results the ActivateWrite command arrives to a new StreamEngine object just set up for the new connection, which is not fully initialized pending handshake.

package org.zeromq;

import java.util.ArrayList;
import java.io.IOException;

import org.junit.Test;

import zmq.ZMQ;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZContext;

class Puller extends Thread
{
    Socket pullSock;
    int cnt = 0;

    public Puller(ZContext ctx, String host)
    {
        pullSock = ctx.createSocket(ZMQ.ZMQ_PULL);
        pullSock.setRcvHWM(1);
        pullSock.setHeartbeatIvl(5000);
        pullSock.setHeartbeatTimeout(1000);

        boolean brc = pullSock.connect(host);
    }

    public void run() {
        while (true) {
            // occasionally consume the message to trigger command being send to Session
            consume();
            try {
                Thread.sleep(6000);
            }
            catch (InterruptedException e) {
                System.out.println("Sleeping interrupted..." + e);
            }
        }
    }

    void consume() {
        String msg = pullSock.recvStr();
        cnt += 1;
        if (cnt % 10 == 1) {
            System.out.println("Consumed msg [" + cnt + "]. Message: " + msg);
        }
    }
}

public class TestHeartBeatRaceCondition
{
    @Test
    public void testHeartBeatRaceCondition() throws IOException
    {
        ZContext ctx = new ZContext();
        Socket push = ctx.createSocket(ZMQ.ZMQ_PUSH);
        push.setSndHWM(1);
        String host = "tcp://127.0.0.1:12345";
        boolean brc = push.bind(host);

        ArrayList<Puller> pullers = new ArrayList<Puller>();
        for (int i = 0; i < 50; ++i) {
            pullers.add(new Puller(ctx, host));
        }

        for (Puller puller : pullers) {
            puller.start();
        }

        while (true) {
            String content = "12345678ABCDEFGH12345678abcdefgh";
            //  Send the message.
            push.send(content);
        }

    }
}

@fbacchella
Copy link
Contributor

Indeed, I quickly get a

java.lang.AssertionError
	at zmq.io.StreamEngine.restartInput(StreamEngine.java:553)
	at zmq.io.SessionBase.writeActivated(SessionBase.java:276)
	at zmq.pipe.Pipe.processActivateWrite(Pipe.java:340)
	at zmq.ZObject.processCommand(ZObject.java:55)
	at zmq.Command.process(Command.java:77)
	at zmq.io.IOThread.inEvent(IOThread.java:81)
	at zmq.poll.Poller.run(Poller.java:276)
	at java.lang.Thread.run(Thread.java:748)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants