Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connect waitForCompletition hangs when broker disconnects quickly after accepting #1013

Open
2 tasks done
fnordian opened this issue Sep 7, 2023 · 0 comments
Open
2 tasks done

Comments

@fnordian
Copy link

fnordian commented Sep 7, 2023

  • Bug exists Release Version 1.2.5 ( Master Branch)
  • Bug exists in MQTTv3 Client on Snapshot Version 1.2.6-SNAPSHOT (Develop Branch)
  • [?] Bug exists in MQTTv5 Client on Snapshot Version 1.2.6-SNAPSHOT (Develop Branch)

There is a race-condition that lets waitForCompletion hang forever if the broker closes a connection quickly after accepting it.

The code below can reproduce it, but since it's a race-condition it does not occur reliably. The bug is demonstrated when the test hangs.

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

@Slf4j
public class PahoReconnectTest {

    private ServerSocket serverSocket = new ServerSocket(0);
    private int port = serverSocket.getLocalPort();

    @Getter
    @Setter
    private volatile long disconnectInterval = 0;


    public PahoReconnectTest() throws IOException {
        new Thread(() -> {
            try {
                dummyMqttBroker();
            } catch (IOException | InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();

    }
    private void dummyMqttBroker() throws IOException, InterruptedException {

        port = serverSocket.getLocalPort();

        disconnectInterval = 1;

        while (!Thread.currentThread().isInterrupted()) {
            log.info("accepting");
            Socket clientSocket = serverSocket.accept();

            new Thread(() -> {

                try {
                    log.info("accepted new client");
                    clientSocket.getInputStream().read(new byte[22], 0, 22);
                    PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
                    out.write(new char[]{0x20, 0x02, 0x00, 0x00});
                    out.flush();
                    if (disconnectInterval > 20) {
                        Thread.sleep(disconnectInterval);
                    }
                    disconnectInterval *= 2;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    try {
                        clientSocket.close();
                    } catch (IOException e) {
                        log.error("unable to close client socket", e);
                    }
                }
            }).start();
        }
    }


    @Test
    public void testPahoReconnect() throws MqttException, InterruptedException {
        String uri = "tcp://127.0.0.1:" + port;
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setMaxReconnectDelay(10);


        for (int i = 0; i < 30; i++) {
            try {
                log.info("creating new client");

                IMqttAsyncClient client = new MqttAsyncClient(uri, "test", null);
                client.connect(mqttConnectOptions).waitForCompletion();

                Thread.sleep(30);
                client.close();
            } catch (MqttException mqttException) {

            }

        }
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant