Skip to content
This repository has been archived by the owner on Aug 13, 2020. It is now read-only.

Add clean_session parameter and command support to MQTTDynamicSubscriber #110

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from

Conversation

cergab
Copy link

@cergab cergab commented Jan 15, 2019

clean_session flag is set by default.
Add support for following commands: 'subscribe' (topic subscribe) and
'unsubscribe' (topic unsubscribe)
Other small updates

clean_session flag is set by default.
Add support for following commands: 'subscribe' (topic subscribe) and
'unsubscribe' (topic unsubscribe)
Other small updates
message["client_id"] = client_id
message = ""
for (client_id, mqtt) in self.mqtt_dict.iteritems():
if (calvinsys.can_read(mqtt)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove superfluous parentheses (in all if-statements, not only here)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@condition(action_input=['client_id', 'uri', 'topic', 'qos'])
def update_topic(self, client_id, uri, topic, qos):
@condition(action_input=['client_id', 'uri', 'cmd', 'topic', 'qos'])
def update_topic(self, client_id, uri, cmd, topic, qos):
if (topic is None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Topic must contain at least one character.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"type": "string"
},
},
"required": ["topic"]
Copy link
Contributor

@olaan olaan Jan 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Command would be better as an enum. Also, describe defaults (when qos and cmd missing.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.format(topic, status[0]))

if (not done and retry > 0):
time.sleep(0.2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you need delay or wait, either use thread or delayed call (see other calvinsys for usage). This usage will freeze the entire runtime during the sleep.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I said that we should have retries, I meant for the connection, but forgot that when using the loop_start reconnect is handled automatically. But what needs to be added is to resubscribe to all topics in the on_connect callback.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@olaan olaan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See notes.

self.data = []
clean_session = kwargs.get('clean_session', 'false').lower() == "true"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean session that comes in should already be a boolean.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -245,37 +264,89 @@ def on_log_debug(client, string):
elif is_tls:
_log.warning("TLS configuration is missing!")

self.client.connect_async(host=hostname, port=port)
self.client.connect(host=hostname, port=port)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this from async. This would block the runtime, please change back.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Add support for MQTTDynamicSubscriber actor migration
Add migration test using public MQTT broker http://test.mosquitto.org/
Other minor updates

def on_disconnect(client, userdata, rc):
_log.warning("MQTT broker {}:{} disconnected".format(hostname, port))

def on_message(client, userdata, message):
_log.info("New message {}".format(message))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be removed on the next commit

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants