Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SpringApplicationShutdownHook hangs in Mqttv5ClientManager stop() if client never was connected #9112

Closed
in-fke opened this issue May 3, 2024 · 7 comments

Comments

@in-fke
Copy link

in-fke commented May 3, 2024

In what version(s) of Spring Integration are you seeing this issue?

spring-integration-mqtt-6.2.4

Describe the bug

Mqttv5ClientManager hangs in stop() if never was connected.
Mqttv3ClientManager may be affected as well.

see https://www.linkedin.com/pulse/mqtt-paho-tomcat-stop-using-shutdownsh-fails-sudhir-ravindramohan/

To Reproduce

  1. Instantiate Mqttv5ClientManager with automaticReconnect, while there is no MQTT Broker to connect to
  2. Shutdown Spring Application
  3. Pause in Debuggger to see hang as follows
Thread [SpringApplicationShutdownHook] (Suspended)	
	waiting for: Object  (id=171)	
	Object.wait(long) line: not available [native method]	
	ClientState.quiesce(long) line: 1497	
	ClientComms.disconnectForcibly(long, long, boolean, int, MqttProperties) line: 600	
	ClientComms.disconnectForcibly(long, long, int, MqttProperties) line: 576	
	MqttAsyncClient.disconnectForcibly(long, long, int, MqttProperties) line: 888	
	MqttAsyncClient.disconnectForcibly(long) line: 873	
	MqttListener$1.stop() line: 49	
	MqttListener$1(SmartLifecycle).stop(Runnable) line: 117	
	DefaultLifecycleProcessor.doStop(Map<String,Lifecycle>, String, CountDownLatch, Set<String>) line: 344	
	DefaultLifecycleProcessor$LifecycleGroup.stop() line: 483	
	0x0000021f959848c8.accept(Object) line: not available	
	TreeMap$Values(Iterable<T>).forEach(Consumer<? super T>) line: 75	
	DefaultLifecycleProcessor.stopBeans() line: 313	
	DefaultLifecycleProcessor.onClose() line: 214	
	AnnotationConfigServletWebServerApplicationContext(AbstractApplicationContext).doClose() line: 1136	
	AnnotationConfigServletWebServerApplicationContext(ServletWebServerApplicationContext).doClose() line: 174	
	AnnotationConfigServletWebServerApplicationContext(AbstractApplicationContext).close() line: 1090	
	SpringApplicationShutdownHook.closeAndWait(ConfigurableApplicationContext) line: 145	
	0x0000021f95984230.accept(Object) line: not available	
	LinkedHashSet<E>(Iterable<T>).forEach(Consumer<? super T>) line: 75	
	SpringApplicationShutdownHook.run() line: 114	
	Thread.run() line: 840	

Expected behavior

Troubling code
https://github.com/spring-projects/spring-integration/blob/main/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java#L153
and possibly
https://github.com/spring-projects/spring-integration/blob/main/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java#L151

Sample

Replacing the line

client.disconnectForcibly(getDisconnectCompletionTimeout());

by

if (client.isConnected()) {
    client.disconnect(0).waitForCompletion(getDisconnectCompletionTimeout());
}

makes it work in this case. Not sure if that would be the universal solution. Not tested with Mqttv3ClientManager.

@in-fke in-fke added status: waiting-for-triage The issue need to be evaluated and its future decided type: bug labels May 3, 2024
@artembilan
Copy link
Member

Sounds reasonable.
Feel free to contribute the fix: https://github.com/spring-projects/spring-integration/blob/main/CONTRIBUTING.adoc

However I wonder why you have changed disconnectForcibly() to that disconnect(0)?

@artembilan artembilan added this to the 6.3.0 milestone May 3, 2024
@artembilan artembilan added in: mqtt for: backport-to-6.1.x for: backport-to-6.2.x and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels May 3, 2024
@in-fke
Copy link
Author

in-fke commented May 6, 2024

Feel free to contribute the fix

Hey, thanks for the quick follow-up. I am yet to collect some insights on this, I am tempted to contribute, but my time is limited.
Meanwhile, I found this in Paho: eclipse/paho.mqtt.java#1036

@artembilan
Copy link
Member

Right. So, until the fix in done in the Paho client, let's consider to have a workaround as you have suggested:

if (client.isConnected()) {
    client.disconnectForcibly(getDisconnectCompletionTimeout());
}

Looks like all the MQTT channel adapters have to be fixed. Plus their respective ClientManager implementations.

@in-fke
Copy link
Author

in-fke commented May 6, 2024

So, until the fix in done in the Paho client, let's consider to have a workaround

Well the actual Bug I stumbled upon in the Paho Client seems to be this one:
eclipse/paho.mqtt.java#686

@artembilan
Copy link
Member

So, you mean that condition won't be enough and we may end up with hanging threads in the Paho client in a Connect already in progress state?
Then we need to look in the way how to cancel all those tasks.
Probably something like ScheduledExecutorService managed by Spring application context injection down to that MqttAsyncClient may help to prevent those threads handing around when we stop our application?

@in-fke
Copy link
Author

in-fke commented May 7, 2024

Not sure whether we are starting to look too deep into the rabbit hole... but here goes
org.eclipse.paho.mqttv5.client.MqttAsyncClient.reconnectTimer
eclipse/paho.mqtt.java#1036 (comment)

Maybe close() should invoke stopReconnectCycle()?

org.eclipse.paho.mqttv5.client.MqttAsyncClient.executorService
https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java#L249
can be injected.

org.eclipse.paho.mqttv5.client.internal.ClientComms.executorService
https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientComms.java#L85
gets passed through from MqttAsyncClient

Didn't look for MQTTv3.

@artembilan
Copy link
Member

Yeah... I see, that doesn't help:

    public Timer(String name, boolean isDaemon) {
        var threadReaper = new ThreadReaper(queue, thread);
        this.cleanup = CleanerFactory.cleaner().register(this, threadReaper);
        thread.setName(name);
        thread.setDaemon(isDaemon);
        thread.start();
    }

So, this Timer starts a new thread and if there is no connection, it is never going to be canceled 😢 .
And looks like there is no way for us to interfere there unless we use reflection to reach that stopReconnectCycle().

WDYT? Does it worth it to pursue such a workaround to avoid leaks with these never-ending timers in Paho library?

spring-builds pushed a commit that referenced this issue May 21, 2024
Fixes: #9112

`Mqttv5ClientManager` hangs in `stop()` if never was connected.
The scheduled reconnect timer in the client is never cancelled.

* Call `stopReconnectCycle()` on the client via reflection when we disconnect
from the client in Spring Integration MQTT components

(cherry picked from commit 937da13)
artembilan added a commit that referenced this issue May 21, 2024
Fixes: #9112

`Mqttv5ClientManager` hangs in `stop()` if never was connected.
The scheduled reconnect timer in the client is never cancelled.

* Call `stopReconnectCycle()` on the client via reflection when we disconnect
from the client in Spring Integration MQTT components

(cherry picked from commit 937da13)

# Conflicts:
#	spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java
#	spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java
#	spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java
#	spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants