Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The official example code is misleading and may cause Java OutOfMemoryError. #794

Open
xiaokxluoshu opened this issue Nov 17, 2023 · 1 comment

Comments

@xiaokxluoshu
Copy link

xiaokxluoshu commented Nov 17, 2023

public final class EmbeddedLauncher {

static class PublisherListener extends AbstractInterceptHandler {

    @Override
    public String getID() {
        return "EmbeddedLauncherPublishListener";
    }

    @Override
    public void onPublish(InterceptPublishMessage msg) {
        final String decodedPayload = msg.getPayload().toString(UTF_8);
        System.out.println("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);
    }
}

public static void main(String[] args) throws InterruptedException, IOException {
    IResourceLoader classpathLoader = new ClasspathResourceLoader();
    final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader);

    final Server mqttBroker = new Server();
    List<? extends InterceptHandler> userHandlers = Collections.singletonList(new PublisherListener());
    mqttBroker.startServer(classPathConfig, userHandlers);

    System.out.println("Broker started press [CTRL+C] to stop");
    //Bind  a shutdown hook
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        System.out.println("Stopping broker");
        mqttBroker.stopServer();
        System.out.println("Broker stopped");
    }));

    Thread.sleep(20000);
    System.out.println("Before self publish");
    MqttPublishMessage message = MqttMessageBuilders.publish()
        .topicName("/exit")
        .retained(true)
        .qos(MqttQoS.EXACTLY_ONCE)
        .payload(Unpooled.copiedBuffer("Hello World!!".getBytes(UTF_8)))
        .build();

    mqttBroker.internalPublish(message, "INTRLPUB");
    System.out.println("After self publish");
}

private EmbeddedLauncher() {
}
}

The class PublisherListener inherits from the AbstractInterceptHandler class, but the onPublish method in the implementation class does not call msg.getPayload().release(). If all implementations follow this method, it will cause OutOfMemoryError after the MQTT broker runs for a period of time.

Actual behavior

Steps to reproduce

Minimal yet complete reproducer code (or URL to code) or complete log file

Moquette MQTT version

0.17

JVM version (e.g. java -version)

1.8.0_331

OS version (e.g. uname -a)

win11

@xiaokxluoshu
Copy link
Author

I suggest making the following modifications, providing developers with proper guidance.

 static class PublisherListener extends AbstractInterceptHandler {

    @Override
    public String getID() {
        return "EmbeddedLauncherPublishListener";
    }

    @Override
    public void onPublish(InterceptPublishMessage msg) {
        final String decodedPayload = msg.getPayload().toString(UTF_8);
        System.out.println("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);
        super.onPublish(msg);
    }
 }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant