Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connect already in progress, #1035

Open
3 tasks done
marinesky opened this issue Apr 30, 2024 · 0 comments
Open
3 tasks done

Connect already in progress, #1035

marinesky opened this issue Apr 30, 2024 · 0 comments

Comments

@marinesky
Copy link

marinesky commented Apr 30, 2024

  • Bug exists Release Version 1.2.5 ( Master Branch)
  • Bug exists in MQTTv3 Client on Snapshot Version 1.2.6-SNAPSHOT (Develop Branch)
  • Bug exists in MQTTv5 Client on Snapshot Version 1.2.6-SNAPSHOT (Develop Branch)

Hi,we are using the Paho MQTT Java client analog pressure test, There is a certain probability when the client connects:Connect already in progress,

Here is My Code

public class Mqtt3Test {
    public static void main(String[] args)  {
        Integer begin =0;
        Integer end =10000;
        String defaultServer ="127.0.0.1:1883";
        try {
            CountDownLatch latch = new CountDownLatch(1);
            for (int i=begin;i<end;i++){


                Mqtt3 mqtt = new Mqtt3(defaultServer, i+"");

                mqtt.start();
                Thread.sleep(100);
            }

            latch.await();
        } catch (Exception e){
            System.out.println(" ChairTest error "+e.getMessage());
        }
    } }

`

public class Mqtt3 {

private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

public MqttClient client;
private MqttConnectOptions options;

private String serverURL;
private String id;



public Mqtt3(String serverURL, String id) {
    this.id=id;
    this.serverURL =serverURL;
    init();
}





private void init() {
    try {
        this.client = new MqttClient("tcp://" + serverURL, id, new MemoryPersistence());
        client.setTimeToWait(10*1000);
        this.options = new MqttConnectOptions();
        this.options.setCleanSession(true);
        this.options.setConnectionTimeout(10);
        this.options.setAutomaticReconnect(false);
        this.options.setKeepAliveInterval(20);
        this.client.setCallback((MqttCallback)new MqttCallbackExtended() {
            public void connectComplete(boolean reconnect, String serverURI) {

            }

            public void connectionLost(Throwable cause) {

            }

            public void messageArrived(String topic, MqttMessage message) throws Exception {

            }

            public void deliveryComplete(IMqttDeliveryToken token) {}
        });
    } catch (Exception e) {
        System.out.println("initException:" + e.getMessage());
    }
}

public void shutdown() {
    try {
        client.disconnectForcibly();
    } catch (Exception e) {
        System.out.println("clientId:"+id+",disconnectException:" + e.getMessage());
    }
    try {
        client.close(true);
    } catch (Exception e) {
        System.out.println("clientId:"+id+",closeException:" + e.getMessage());
    }

}

public void start() {
    try {
        startAutoConnectCheck();
        client.connect(options);

    } catch (Exception e) {
        System.out.println("clientId:"+id+",startException:" + e.getMessage());
    }
}

public boolean isShutDown() {
    if (!client.isConnected())
        return true;
    return false;
}


public void startAutoConnectCheck() {
    executorService.scheduleAtFixedRate(this::checkConnection, 60, 60, TimeUnit.SECONDS);
}

private void checkConnection() {
    if (isShutDown()){
        try {
            client.connect(options);
        } catch (Exception e) {
            System.out.println( "clientId:" + id  + " checkConnectException " + e.getMessage());
            shutdown();
            client=null;
            options=null;
            init();
        }
    }
}

In the code above, I use connection checking to ensure that each client connects successfully, at best.However, the following results are extremely rare, causing the connection to not be completely released

clientId:8000,startException:Timed out waiting for a response from the server
clientId:8000 checkConnectException Connect already in progress
clientId:8000 ,closeException:Connect already in progress

Mqtt Server always can recevie Client Ping,and tcpdump confirm conack also send

"MQTT Ping: 8000 " #24144 prio=5 os_prio=0 tid=0x00007f7e3057a800 nid=0xb3f9c in Object.wait() [0x00007f78f53de000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.util.TimerThread.mainLoop(Timer.java:552)
- locked <0x00000000c5237fa0> (a java.util.TaskQueue)
at java.util.TimerThread.run(Timer.java:505)

"MQTT Call: 8000 " #24140 prio=5 os_prio=0 tid=0x00007f7e2c58a800 nid=0xb3f98 in Object.wait() [0x00007f78f50db000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:198)
- locked <0x00000000c4c04cc0> (a java.lang.Object)
at java.lang.Thread.run(Thread.java:748)

"MQTT Snd: 8000 " #24139 prio=5 os_prio=0 tid=0x00007f7e2c588800 nid=0xb3f97 in Object.wait() [0x00007f78f51dc000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at org.eclipse.paho.client.mqttv3.internal.ClientState.get(ClientState.java:836)
- locked <0x00000000c4c04cf0> (a java.lang.Object)
at org.eclipse.paho.client.mqttv3.internal.CommsSender.run(CommsSender.java:151)
at java.lang.Thread.run(Thread.java:748)

"MQTT Rec: 8000 " #24138 prio=5 os_prio=0 tid=0x00007f7e2c586800 nid=0xb3f96 runnable [0x00007f78f52dd000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.net.SocketInputStream.read(SocketInputStream.java:224)
at java.io.DataInputStream.readByte(DataInputStream.java:265)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:160)
at java.lang.Thread.run(Thread.java:748)

I hope you can help me solve this problem,thank

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant