Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected Behaviour shown in PUB-SUB pattern #911

Open
rajputChaturya opened this issue Feb 9, 2022 · 7 comments
Open

Unexpected Behaviour shown in PUB-SUB pattern #911

rajputChaturya opened this issue Feb 9, 2022 · 7 comments

Comments

@rajputChaturya
Copy link

rajputChaturya commented Feb 9, 2022

I am using pub-sub pattern for fanning out a stream to 25 subscribers. so basically, data is being transferred from 1 publisher to 25 subscribers.

The payload size of single message is 256 Bytes
And we are publishing 40k messages / sec (as batch of 4k messages every 100 ms)
5 IO threads of publisher were kept

I ran my publisher on (32 core 126 GB ram) azure vm
And each of 25 subscribers run on (4 core 16 GB ram) azure vm

The test was run for 12 hours and the results achieved are shown below in the image
p95, p99.... are in milliseconds
mean_rate is the mean of messages received / second at subscriber side
name is the vm name on which subscriber was running
count is the total number of messages sent to subscriber in 12 hours

   name  min  max      mean  p50   p75   p95   p98   p99  p999       count     mean_rate
0   vm.27   -1    0 -0.792223 -1.0  -1.0   0.0   0.0   0.0   0.0  1729096000  39999.223763
1   vm.29   -1   30 -0.206549  0.0   0.0   0.0   0.0   0.0  24.0  1729326000  40000.012745
2   vm.28   -1    1 -0.221198  0.0   0.0   0.0   1.0   1.0   1.0  1729312000  39999.594228
3   vm.30   -1    2  0.187631  0.0   0.0   1.0   1.0   1.0   2.0  1729322000  39999.994097
4    vm.9    0    4  0.560987  1.0   1.0   1.0   1.0   1.0   4.0  1727722675  39999.800001
5   vm.18    0    2  0.568149  1.0   1.0   1.0   1.0   1.0   1.0  1728531783  40000.006029
6   vm.17    0   11  0.587446  1.0   1.0   1.0   1.0   1.0   5.0  1728532568  40000.017977
7    vm.8   -1   15  0.348731  0.0   1.0   1.0   1.0   2.0  12.0  1727736000  39999.957408
8   vm.13    0   11  0.998037  1.0   1.0   1.0   2.0   2.0  11.0  1728132000  39999.955226
9   vm.16    0    3  0.764761  1.0   1.0   2.0   2.0   2.0   3.0  1728338585  39999.986043
10  vm.15    0   28  1.243683  1.0   1.0   2.0   2.0   2.0  20.0  1728332000  39999.965091
11  vm.14    1    3  1.393507  1.0   2.0   2.0   2.0   2.0   3.0  1728138000  40000.040671
12  vm.31   -1    4  0.294211  0.0   0.0   1.0   2.0   3.0   4.0  1729526000  39999.992003
13  vm.32   -1    5  0.425758  0.0   1.0   1.0   3.0   3.0   5.0  1729530000  40000.014967
14  vm.10    0   11  0.594815  1.0   1.0   1.0   1.0   4.0  10.0  1727932000  39999.629982
15  vm.25    0    6  0.766227  1.0   1.0   2.0   3.0   4.0   6.0  1729106000  39999.389766
16  vm.24    0   28  0.881269  1.0   1.0   2.0   2.0   6.0  28.0  1728920000  39999.583785
17  vm.12    0   19  0.741611  1.0   1.0   1.0   1.0   7.0  16.0  1728137085  39999.860449
18  vm.26    0   14  3.304725  2.0   5.0   9.0  10.0  11.0  14.0  1729144000  39999.952394
19  vm.20    0   15  4.277812  3.0   6.0  10.0  11.0  12.0  14.0  1728737203  39999.659962
20  vm.19    1   33  4.406672  3.0   6.0  10.0  11.0  15.0  33.0  1728544732  39999.944749
21  vm.11    0   35  1.825326  1.0   2.0   3.0  11.0  19.0  34.0  1727944035  40000.008251
22  vm.21    0   25  7.798344  7.0  11.0  18.0  20.0  20.0  25.0  1728747832  39999.996823
23  vm.23    1   26  7.826982  7.0  11.0  18.0  20.0  21.0  26.0  1728910503  39999.060496
24  vm.22    1   28  8.625836  8.0  11.0  19.0  21.0  21.0  28.0  1728748000  39999.967383

As you can in the image that some subscribers received the messages in less than 5 ms but some took around 20 ms (talking about p99 latencies).

Why there is so much skewness in the latencies, is there an explanation why such behaviour is shown by pub-sub pattern?

In theory every subscriber should receive messages in nearly same time.

Is it possible to avoid such kind of behaviour, and get nearly equal latencies at each subscriber ?

@trevorbernard
Copy link
Member

The test results image is broken.

I'm assuming you're using TCP and the behaviour is to fan out all the messages to the subscribers so that may explain the latency you are seeing. If may want to look at multicast (epgm) if you don't want this behaviour. Unfortunately, it's not supported in JeroMQ but we're more than happy to accept any PRs that add it.

@rajputChaturya
Copy link
Author

rajputChaturya commented Feb 9, 2022

Hi @trevorbernard Thanks for quick reply, I have fixed the broken image and you will be able to see the results now
Any thoughts for why there is such skewness in the latencies and for improving it are welcome

As for multicast, we can only use unicast for our application so its not an option for us.

@rajputChaturya
Copy link
Author

Code for publisher

package publishers;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Slf4jReporter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

@Slf4j
public class Publisher {
    private static final int PORT = Integer.parseInt(System.getProperty("port", "5555"));
    private static final String TOPIC_NAME = System.getProperty("topicName", "zeromq-perf-test");
    private static final int PAYLOAD_SIZE_IN_BYTES = Integer.parseInt(System.getProperty("payloadSizeInBytes", "256"));
    private static final int BATCH_SIZE = Integer.parseInt(System.getProperty("batchSize", "4000"));
    private static final int PRODUCE_DELAY = Integer.parseInt(System.getProperty("produceDelay", "100"));
    private static final int IO_THREADS = Integer.parseInt(System.getProperty("ioThreads", "5"));
    private static final byte[] TOPIC_NAME_BYTES = TOPIC_NAME.getBytes(ZMQ.CHARSET);

    @SneakyThrows
    public static void main(String[] args) {
        printReceivedSystemProperties();

        MetricRegistry metricRegistry = new MetricRegistry();
        setupMetrics(metricRegistry);

        try (ZContext context = new ZContext(IO_THREADS)) {
            context.setSndHWM(BATCH_SIZE);
            log.info("Publisher started on port: {}", PORT);

            ZMQ.Socket publisher = context.createSocket(SocketType.PUB);
            String address = String.format("tcp://*:%d", PORT);
            publisher.bind(address);

            ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

            ScheduledFuture<?> future = scheduledExecutorService.scheduleAtFixedRate(() -> sendBatch(publisher, metricRegistry), 5000, PRODUCE_DELAY, TimeUnit.MILLISECONDS);
            future.get();
        }
    }

    private static void sendMessage(ZMQ.Socket publisher, MetricRegistry metricRegistry) {
        long timeStamp = System.currentTimeMillis();
        byte[] randomBytesPayload = RandomUtils.nextBytes(PAYLOAD_SIZE_IN_BYTES - TOPIC_NAME_BYTES.length - 8);

        ByteBuffer byteBuffer = ByteBuffer.allocate(PAYLOAD_SIZE_IN_BYTES);
        byteBuffer.put(TOPIC_NAME_BYTES);
        byteBuffer.putLong(timeStamp);
        byteBuffer.put(randomBytesPayload);

        boolean sent = publisher.send(byteBuffer.array());

        if (sent) {
            metricRegistry.meter(MetricName.SEND_MSG).mark();
        }
        else {
            metricRegistry.meter(MetricName.DROPPED_MSG).mark();
        }
    }

    private static void sendBatch(ZMQ.Socket publisher, MetricRegistry metricRegistry) {
        IntStream.range(0, BATCH_SIZE).forEach(k -> sendMessage(publisher, metricRegistry));
    }

    private static void setupMetrics(MetricRegistry metricRegistry) {
        Slf4jReporter slf4jReporter = Slf4jReporter.forRegistry(metricRegistry)
                .convertDurationsTo(TimeUnit.MILLISECONDS)
                .convertRatesTo(TimeUnit.SECONDS)
                .withLoggingLevel(Slf4jReporter.LoggingLevel.INFO)
                .build();
        slf4jReporter.start(5000, 5000, TimeUnit.MILLISECONDS);
    }

    private static class MetricName {
        public static final String SEND_MSG = "send.msg";
        public static final String DROPPED_MSG = "dropped.msg";
    }

    private static void printReceivedSystemProperties() {
        log.info("--------------------------------");
        log.info("PORT: {}", PORT);
        log.info("TOPIC_NAME: {}", TOPIC_NAME);
        log.info("PAYLOAD_SIZE_IN_BYTES: {}", PAYLOAD_SIZE_IN_BYTES);
        log.info("PRODUCE_DELAY: {}", PRODUCE_DELAY);
        log.info("BATCH_SIZE: {}", BATCH_SIZE);
        log.info("IO_THREADS: {}", IO_THREADS);
        log.info("--------------------------------");
    }
}

@rajputChaturya
Copy link
Author

code for subscriber

package subscribers;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Slf4jReporter;
import lombok.extern.slf4j.Slf4j;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

@Slf4j
public class Subscriber {
    private static final String PUBLISHER_ENDPOINT = System.getProperty("publisherEndpoint", "localhost:5555");
    private static final String TOPIC_NAME = System.getProperty("topicName", "zeromq-perf-test");
    private static final Integer EXPECTED_PAYLOAD_SIZE = Integer.parseInt(System.getProperty("expectedPayloadSize", "256"));
    private static final int IO_THREADS = Integer.parseInt(System.getProperty("ioThreads", "1"));
    private static final String VM_NUMBER = System.getProperty("vmNumber");
    private static final int TOPIC_NAME_BYTES_LENGTH = TOPIC_NAME.getBytes(StandardCharsets.UTF_8).length;

    public static void main(String[] args) {
        printReceivedSystemProperties();

        MetricRegistry metricRegistry = new MetricRegistry();
        setupMetrics(metricRegistry);

        try (ZContext context = new ZContext(IO_THREADS)) {
            log.info("Subscriber started listening to publisher: {}", PUBLISHER_ENDPOINT);

            ZMQ.Socket subscriber = context.createSocket(SocketType.SUB);
            String address = String.format("tcp://%s", PUBLISHER_ENDPOINT);
            subscriber.connect(address);

            subscriber.subscribe(TOPIC_NAME);

            while (!Thread.currentThread().isInterrupted()) {
                if(!receiveMessage(subscriber, metricRegistry)) {
                    log.info("error in message received, therefore exiting");
                    break;
                }
            }
        }
    }

    private static boolean receiveMessage(ZMQ.Socket subscriber, MetricRegistry metricRegistry) {
        byte[] bytes = subscriber.recv();
        if (bytes == null) {
            log.info("bytes = null received");
            return false;
        }

        // unwrap the bytes received
        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
        byte[] topicBytes = new byte[TOPIC_NAME_BYTES_LENGTH];
        byteBuffer.get(topicBytes);
        String topicNameReceived = new String(topicBytes, ZMQ.CHARSET);
        long timeStampReceived = byteBuffer.getLong();

        // check the topic name received
        if (!TOPIC_NAME.equals(topicNameReceived)) {
            log.info("wrong topic name encountered, {}", topicNameReceived);
            return false;
        }

        // record event
        final long now = System.currentTimeMillis();

        metricRegistry.meter(MetricName.RECV_MSG).mark();
        metricRegistry.histogram(MetricName.RECV_MSG_LATENCY).update(now - timeStampReceived);

        return true;
    }

    private static void setupMetrics(MetricRegistry metricRegistry) {
        Slf4jReporter slf4jReporter = Slf4jReporter.forRegistry(metricRegistry)
                .convertDurationsTo(TimeUnit.MILLISECONDS)
                .convertRatesTo(TimeUnit.SECONDS)
                .withLoggingLevel(Slf4jReporter.LoggingLevel.INFO)
                .build();
        slf4jReporter.start(5000, 5000, TimeUnit.MILLISECONDS);
    }

    private static class MetricName {
        public static final String RECV_MSG_LATENCY =  "recv.msg.latency." + VM_NUMBER;
        public static final String RECV_MSG = "recv.msg." + VM_NUMBER;
    }

    private static void printReceivedSystemProperties() {
        log.info("--------------------------------");
        log.info("PUBLISHER_ENDPONT: {}", PUBLISHER_ENDPOINT);
        log.info("TOPIC_NAME: {}", TOPIC_NAME);
        log.info("EXPECTED_PAYLOAD_SIZE: {}", EXPECTED_PAYLOAD_SIZE);
        log.info("IO_THREADS: {}", IO_THREADS);
        log.info("VM_NUMBER: {}", VM_NUMBER);
        log.info("--------------------------------");
    }
}

@fbacchella
Copy link
Contributor

Because of Java’s GC, the high percentile values can be surprising. You can have a look at issue #723 were at track occasional high latency events. I explained my methodology at #723 (comment)

@fbacchella
Copy link
Contributor

Are you sure your chrony/ntp setup is working perfectly ? Does azur ensure network latency ?

@rajputChaturya
Copy link
Author

rajputChaturya commented Feb 9, 2022

yes, the chrony setup is working fine, we can consider +-1 ms error in the results due to synchronization.

moreover i ran tests many times, and there is always a skewness in the results but the vms getting high values are different every time, that supports the fact that synchronization is not a problem here

     name  min  max       mean   p50   p75   p95   p98   p99  p999      count     mean_rate
0   vm.27   -2   10  -0.504127  -1.0   0.0   1.0   1.0   1.0   9.0  289216000  39997.126042
1   vm.14    0    9   0.124599   0.0   0.0   1.0   1.0   1.0   6.0  288215806  39996.514824
2   vm.28   -1   13   0.424390   1.0   1.0   1.0   1.0   1.0  13.0  289210000  39996.232810
3   vm.31   -1   10  -0.532000  -1.0   0.0   0.0   0.0   2.0   8.0  289408000  39995.965755
4   vm.11    0   11   1.045031   1.0   1.0   1.0   2.0   2.0   8.0  288044000  40000.203561
5   vm.12    0   16   1.103232   1.0   1.0   1.0   2.0   2.0  16.0  288023900  39997.529378
6   vm.18    0   14   1.290553   1.0   2.0   2.0   2.0   2.0  14.0  288444000  40000.510229
7   vm.17    1   14   1.767484   2.0   2.0   2.0   3.0   6.0  14.0  288444000  40000.466059
8   vm.19   -3   27  -1.951474  -2.0  -2.0  -1.0   0.0   7.0  25.0  288644000  40000.231504
9   vm.13    0   20   0.643688   1.0   1.0   1.0   4.0   7.0  11.0  288044000  40000.490240
10  vm.24   -2   26   2.507308  -1.0   6.0  13.0  14.0  15.0  26.0  288790000  39992.751528
11  vm.30   -1   21   2.280707   0.0   2.0  13.0  15.0  16.0  17.0  289386000  39993.107380
12  vm.32   -1   21   2.427096   0.0   3.0  13.0  15.0  16.0  20.0  289376000  39991.719473
13  vm.26   -3   19   1.403257  -2.0   5.0  14.0  16.0  17.0  19.0  288942000  39986.134034
14  vm.25   -3   19   1.738068  -2.0   6.0  15.0  16.0  17.0  19.0  288597720  39938.499157
15  vm.15    0   19   6.143633   5.0  11.0  15.0  17.0  17.0  19.0  288190000  39992.801370
16  vm.22   -2   21   2.770279  -1.0   7.0  15.0  17.0  18.0  20.0  288774000  39990.699343
17  vm.16    1   20   6.549807   5.0  11.0  16.0  17.0  18.0  20.0  288390000  39992.817408
18   vm.8    0   24  10.032538  10.0  15.0  19.0  20.0  20.0  22.0  287590794  39993.151167
19  vm.20   -3   22   5.174700   0.0  14.0  19.0  20.0  21.0  22.0  288590000  39992.864021
20  vm.21   -3   22   5.808468   3.0  15.0  19.0  20.0  21.0  22.0  288546000  39986.828140
21  vm.29   -1   24   4.228670   0.0   8.0  20.0  21.0  22.0  23.0  289184000  39992.606414
22  vm.23   -3   32   3.573358   1.0   8.0  16.0  18.0  23.0  32.0  288792348  39992.979592
23  vm.10    1   26  15.150338  16.0  21.0  23.0  24.0  24.0  26.0  287754000  39987.198175
24   vm.9    1   28  15.894782  18.0  22.0  24.0  25.0  26.0  28.0  287652000  39973.655796

here are another run results, as you can see that vm's getting high values are different, and this was a 2 hour test run

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants