Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Router/Dealer Pattern facing unnecessary delay during multiple connections #987

Open
PanagiotisDrakatos opened this issue May 1, 2024 · 6 comments

Comments

@PanagiotisDrakatos
Copy link

I am using the Jer0mq server socket model and specifically the router dealer pattern because I want to validate the identity of the clients. My problem is that I notice random upsurge spikes of 500 ms when I use a loop case where the server binds the socket and the client tries to connect. From negligible delays up to 500 ms delay, why is this happening? how can I avoid such latency? is this possible? What am I doing wrong? Here is my simple code to test it.

package sockets;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

import java.nio.charset.StandardCharsets;

import static sockets.rtdealer.NOFLAGS;

public class ZmqStack {
    public static void main(String[] args) throws InterruptedException {
        Thread brokerThread = new Thread(() -> {
            while (true) {
                try (ZContext context = new ZContext()) {
                    ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
                    broker.bind("tcp://*:5555");
                    String identity = new String(broker.recv());
                    String data1 = new String(broker.recv());
                    String identity2 = new String(broker.recv());
                    String data2 = new String(broker.recv());
                    System.out.println("Identity: " + identity + " Data: " + data1);
                    System.out.println("Identity: " + identity2 + " Data: " + data2);

                    broker.sendMore(identity.getBytes(ZMQ.CHARSET));
                    broker.send("xxx1".getBytes(StandardCharsets.UTF_8));

                    broker.sendMore(identity2.getBytes(ZMQ.CHARSET));
                    broker.send("xxx12");

                    broker.close();
                    context.destroy();
                }
            }
        });
        brokerThread.setName("broker");

        Thread workerThread = new Thread(() -> {
            while (true) {
                try (ZContext context = new ZContext()) {
                    ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
                    String identity = "identity1";
                    worker.setIdentity(identity.getBytes(ZMQ.CHARSET));
                    worker.connect("tcp://localhost:5555");


                    worker.send("Hello1".getBytes(StandardCharsets.UTF_8));

                    String workload = new String(worker.recv(NOFLAGS));
                    System.out.println(Thread.currentThread().getName() + " - Received " + workload);
                }
            }
        });
        workerThread.setName("worker");

        Thread workerThread1 = new Thread(() -> {
            while (true) {
                try (ZContext context = new ZContext()) {
                    ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
                    worker.setIdentity("Identity2".getBytes(ZMQ.CHARSET));

                    worker.connect("tcp://localhost:5555");

                    long start = System.currentTimeMillis();
                    worker.send("Hello2 " + Thread.currentThread().getName());
                    String workload = new String(worker.recv(NOFLAGS));
                    long finish = System.currentTimeMillis();
                    long timeElapsed = finish - start;
                    System.out.println(Thread.currentThread().getName() + " - Received " + workload);
                    System.out.println("Elapsed Time: " + timeElapsed);
                }
            }
        });
        workerThread1.setName("worker1");

        workerThread1.start();
        workerThread.start();
        brokerThread.start();
    }
}

enter image description here

@fbacchella
Copy link
Contributor

Did you try with Epsilon: A No-Op Garbage Collector ?

@PanagiotisDrakatos
Copy link
Author

@fbacchella I have been using this
-XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC
but the problem still persists any other possible solutions?

Thanks in advance

@fbacchella
Copy link
Contributor

Context and sockets are created in the while loop, so every thing is new. I rather look at low level problems like socket exhaustion.

@PanagiotisDrakatos
Copy link
Author

@fbacchella i forgot to mention that those upsurges started after the fourth/third time not exactly after many reconnections to conclude and say that socket exhaustion is the problem. Are y sure about that? Any other clue?

@fbacchella
Copy link
Contributor

I ran you code, there is a few hiccups of about 100-200ms, but not a lot.
A long time ago, a similar question was asked and I do some investigation, that might give you some leads: #723 (comment)

@PanagiotisDrakatos
Copy link
Author

@fbacchella Things become even worse when i try localhost on a different machine just with one client.

Here is the server Router part:

package sockets;
import org.zeromq.ZMQ;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Random;
public class Zmq6 {
    static final String URI = "tcp://192.168.1.106:5555";
    static final int NOFLAGS = 0;

    public static class Server implements Runnable {
        public final String name;

        Server(String name) { this.name = name; }

        public void run() {
            ZMQ.Context context = ZMQ.context(1);
            ZMQ.Socket socket = context.socket(ZMQ.ROUTER);
            socket.setImmediate(true);
            socket.setProbeRouter(true);
            socket.bind(URI);

            String address1 = "A";

            long start = System.currentTimeMillis();
            String identity = new String(socket.recv());
            String data1 = new String(socket.recv());
            long finish = System.currentTimeMillis();
            long timeElapsed = finish - start;
            System.out.println(timeElapsed);
            // Wait a second for the workers to connect their sockets.
            System.out.println("Workers started, sleeping 1 second for warmup.");

            socket.send(address1, ZMQ.SNDMORE);
            socket.send("This is the workload.".getBytes(), NOFLAGS);
            socket.close();
            context.term();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        while (true) {
         String address = "A";
         Thread warmup = new Thread(new Server(address));
         warmup.start();
         warmup.join();
        }
    }
}

Here is the client Dealer's part

package io.Adrestus;

import org.junit.jupiter.api.Test;
import org.zeromq.ZMQ;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Random;
public class justTest {
    static final String URI = "tcp://192.168.1.106:5555";
    static final int NOFLAGS = 0;
    public static class Worker implements Runnable {
        public final String name;
        private final byte[] END = "END".getBytes();

        Worker(String name) { this.name = name; }

        public void run() {
            ZMQ.Context context = ZMQ.context(1);
            ZMQ.Socket socket = context.socket(ZMQ.DEALER);
            socket.setIdentity(name.getBytes());
            socket.setIdentity(name.getBytes());
            socket.setImmediate(true);
            socket.setProbeRouter(true);
            socket.connect(URI);

            socket.send("Hello1".getBytes(StandardCharsets.UTF_8));
            long start = System.currentTimeMillis();
            byte[] data = socket.recv(NOFLAGS);
            long finish = System.currentTimeMillis();
            long timeElapsed = finish - start;
            System.out.println(new String(data, StandardCharsets.UTF_8)+" "+timeElapsed);
            socket.close();
            context.term();
        }
    }
    @Test
    public void test() throws InterruptedException {
        while (true) {
            String address1 = "A";
            Thread workerA = new Thread(new Worker(address1));
            workerA.start();
            workerA.join();
        }
    }
}

Here is random spike ms:
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants