Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto assigned subscription id isn't sent to broker #1019

Open
2 of 3 tasks
TDYJeffreyDevloo opened this issue Sep 28, 2023 · 1 comment
Open
2 of 3 tasks

Auto assigned subscription id isn't sent to broker #1019

TDYJeffreyDevloo opened this issue Sep 28, 2023 · 1 comment

Comments

@TDYJeffreyDevloo
Copy link

  • Bug exists Release Version 1.2.5 ( Master Branch)
  • Bug exists in MQTTv3 Client on Snapshot Version 1.2.6-SNAPSHOT (Develop Branch)
  • Bug exists in MQTTv5 Client on Snapshot Version 1.2.6-SNAPSHOT (Develop Branch)

Auto assigned subscription identifier isn't sent to the broker

The existence of the subscription identifier when subscribing using the MQTT5 client is

  • Using the wrong list to check for it's existence
  • Not setting the auto-incremented value on the subscription options for the subscribe request

A simple demo that demonstrates the issue:

import com.hivemq.client.mqtt.datatypes.MqttQos;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

public class TestPaho
{

  public static void main(String[] args) throws InterruptedException, MqttException
  {
    MqttAsyncClient serverClient = new MqttAsyncClient("tcp://localhost:1883", "myServer");
    MqttAsyncClient deviceClient = new MqttAsyncClient("tcp://localhost:1883", "myDevice");

    serverClient.connect().waitForCompletion();
    deviceClient.connect().waitForCompletion();

    System.out.println("Connected");

    CountDownLatch receivedLatch = new CountDownLatch(4);

    MqttProperties subProperties1 = new MqttProperties();
    subProperties1.setSubscriptionIdentifiers(List.of(0)); // Bug in paho? This is for publish but subscribe requires it
    # subProperties1.setSubscriptionIdentifier(1);
    serverClient.subscribe(new MqttSubscription("#", MqttQos.AT_LEAST_ONCE.getCode()), null, null, (topic, message) ->
    {
      System.out.println("# - Received message");
      receivedLatch.countDown();
    }, subProperties1);

    MqttProperties subProperties2 = new MqttProperties();
    subProperties2.setSubscriptionIdentifiers(List.of(0));
    # subProperties2.setSubscriptionIdentifier(2);
    serverClient.subscribe(new MqttSubscription("A", MqttQos.AT_LEAST_ONCE.getCode()), null, null, (topic, message) ->
    {
      System.out.println("A - Received message");
      receivedLatch.countDown();
    }, subProperties2);

    System.out.println("A - Sending");
    deviceClient.publish("A", new MqttMessage("1".getBytes(), MqttQos.AT_LEAST_ONCE.getCode(), false, new MqttProperties()))
            .waitForCompletion();

    System.out.println("A - Sent");

    receivedLatch.await();
    deviceClient.close();
    serverClient.close();
  }
}

Any user of the API needs to use MqttProperties#setSubscriptionIdentifier regardless of Automatic Subscription Identifier Assignment.
The property it should be checking within a subscribe is the MqttProperties#getSubscriptionIdentifier. Not the list.
The doc states:

	/**
	 * Subscription Identifiers. (Publish Only)
	 * 
	 * <p>
	 * The Subscription Identifiers are associated with any subscription created or
	 * modified as the result of a SUBSCRIBE packet. If a subscription was made with
	 * a Subscription Identifier, then any incoming messages that match that
	 * subscription will contain the associated subscription identifier, if the
	 * incoming message matches multiple subscriptions made by the same client, then
	 * it will contain a list of all associated subscription identifiers. This
	 * property is ONLY for PUBLISH packets. For a Subscription Identifier sent in a
	 * SUBSCRIBE packet, see {@link MqttProperties#getSubscriptionIdentifier()}
	 * </p>
	 * 
	 * @return A {@link List} of Subscription Identifiers.
	 */
	public List<Integer> getSubscriptionIdentifiers() {
		return publishSubscriptionIdentifiers;
	}
...

	/**
	 * Subscription Identifier. (Subscribe Only)
	 * 
	 * <p>
	 * The Subscription identifier field can be set on a SUBSCRIBE packet and will
	 * be returned with any incoming PUBLISH packets that match the associated
	 * subscription. This property is ONLY for SUBSCRIBE packets. For Subscription
	 * Identifier(s) sent in a PUBLISH packet, see
	 * {@link MqttProperties#getSubscriptionIdentifiers()}
	 * </p>
	 * 
	 * @return The Subscription Identifier.
	 */
	public Integer getSubscriptionIdentifier() {
		return subscribeSubscriptionIdentifier;
	}

The correct property asside, the newly chosen subId is used internally https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java#L1308 but never set when sending the SUBSCRIBE message https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java#L1314

@TDYJeffreyDevloo
Copy link
Author

Patch for the issue:

Index: org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java b/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java
--- a/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java	(revision f4e0db802a4433645ef011e711646a09ec9fae89)
+++ b/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java	(revision 7fcaefef9bf9ed5cfa68af4086f5f77aa47b81cc)
@@ -19,6 +19,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.util.Hashtable;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -1273,12 +1274,8 @@
 	public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext, MqttActionListener callback,
 			IMqttMessageListener messageListener, MqttProperties subscriptionProperties) throws MqttException {
 
-		int subId = 0;
-		try {
-			subId = subscriptionProperties.getSubscriptionIdentifiers().get(0);
-		} catch (IndexOutOfBoundsException e) {
-			log.fine(CLASS_NAME, "subscribe", "No sub subscription property(s)");
-		}
+		int subId = Optional.ofNullable(subscriptionProperties.getSubscriptionIdentifier())
+			.orElse(0);
 		// Automatic Subscription Identifier Assignment is enabled
 		if (connOpts.useSubscriptionIdentifiers() && this.mqttConnection.isSubscriptionIdentifiersAvailable()) {
 
@@ -1294,9 +1291,15 @@
 			} else {
 				// Automatically assign new ID and link to callback.
 				subId = this.mqttSession.getNextSubscriptionIdentifier();
+				subscriptionProperties.setSubscriptionIdentifier(subId);
+			}
+		} else {
+			if (subId != 0) {
+				log.fine(CLASS_NAME, "subscribe", "Subscription identifiers are not available or disabled");
+				subscriptionProperties.setSubscriptionIdentifier(null);
 			}
 		}
-		
+
 		// add message handlers to the list for this client
 		for (MqttSubscription subscription : subscriptions) {
 			MqttTopicValidator.validate(subscription.getTopic(),

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant