We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
There is a race-condition that lets waitForCompletion hang forever if the broker closes a connection quickly after accepting it.
The code below can reproduce it, but since it's a race-condition it does not occur reliably. The bug is demonstrated when the test hangs.
import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttAsyncClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.io.IOException; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; @Slf4j public class PahoReconnectTest { private ServerSocket serverSocket = new ServerSocket(0); private int port = serverSocket.getLocalPort(); @Getter @Setter private volatile long disconnectInterval = 0; public PahoReconnectTest() throws IOException { new Thread(() -> { try { dummyMqttBroker(); } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } }).start(); } private void dummyMqttBroker() throws IOException, InterruptedException { port = serverSocket.getLocalPort(); disconnectInterval = 1; while (!Thread.currentThread().isInterrupted()) { log.info("accepting"); Socket clientSocket = serverSocket.accept(); new Thread(() -> { try { log.info("accepted new client"); clientSocket.getInputStream().read(new byte[22], 0, 22); PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); out.write(new char[]{0x20, 0x02, 0x00, 0x00}); out.flush(); if (disconnectInterval > 20) { Thread.sleep(disconnectInterval); } disconnectInterval *= 2; } catch (Exception e) { throw new RuntimeException(e); } finally { try { clientSocket.close(); } catch (IOException e) { log.error("unable to close client socket", e); } } }).start(); } } @Test public void testPahoReconnect() throws MqttException, InterruptedException { String uri = "tcp://127.0.0.1:" + port; MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setMaxReconnectDelay(10); for (int i = 0; i < 30; i++) { try { log.info("creating new client"); IMqttAsyncClient client = new MqttAsyncClient(uri, "test", null); client.connect(mqttConnectOptions).waitForCompletion(); Thread.sleep(30); client.close(); } catch (MqttException mqttException) { } } } }
The text was updated successfully, but these errors were encountered:
No branches or pull requests
There is a race-condition that lets waitForCompletion hang forever if the broker closes a connection quickly after accepting it.
The code below can reproduce it, but since it's a race-condition it does not occur reliably. The bug is demonstrated when the test hangs.
The text was updated successfully, but these errors were encountered: