Skip to content
This repository has been archived by the owner on Aug 13, 2020. It is now read-only.

Add clean_session parameter and command support to MQTTDynamicSubscriber #110

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 58 additions & 14 deletions calvin/actorstore/systemactors/net/MQTTDynamicSubscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,57 +35,101 @@ class MQTTDynamicSubscriber(Actor):
"auth": { "username": <username "password": <password> },
"will": { "topic": <topic>, "payload": <payload> },
"transport": <tcp or websocket>,
"clean_session": <True|False>
}

Input:
client_id : MQTT client ID
uri : MQTT broker URI (format: schema://host:port)
cmd : command keyword ('subscribe'|'unsubscribe')
topic : topic to subscribe to
qos : MQTT qos
Output:
message : dictionary {"topic": <topic>, "payload": <payload>, "client_id": <client id>}
"""
CMD_SUBSCRIBE = "subscribe"
CMD_UNSUBSCRIBE = "unsubscribe"

@manage(['settings', 'mqtt_dict'])
@manage(['settings', 'layout'])
def init(self, settings):
if not settings:
settings = {}
self.settings = settings
self.layout = {}
self.setup()

def setup(self):
self.mqtt_dict = {}
self.queue = []

def will_migrate(self):
for mqtt in self.mqtt_dict.itervalues():
calvinsys.close(mqtt)
self.mqtt_dict.clear()

def did_migrate(self):
self.setup()
layout = self.layout
self.layout = {}
for client_id, client_details in layout.iteritems():
details = dict(client_details)
uri = details["uri"]
for topic, qos in dict(details["topics"]).iteritems():
self._update_topic(client_id, uri, self.CMD_SUBSCRIBE, topic, qos)

"""
Read first available MQTT client message
Read all available MQTT clients for messages and store them in a FIFO queue
The reader will only read the first message in the queue.

@note The rest of the messages are expected at the next readings
"""

@stateguard(lambda self:
@stateguard(lambda self: self.queue or
any(calvinsys.can_read(mqtt) for mqtt in self.mqtt_dict.itervalues()))
@condition(action_output=['message'])
def read_message(self):
client_id, mqtt = next((client_id, mqtt)
for (client_id, mqtt) in self.mqtt_dict.iteritems()
if calvinsys.can_read(mqtt))
message = calvinsys.read(mqtt)
# add client id to the message
message["client_id"] = client_id
message = ""
for (client_id, mqtt) in self.mqtt_dict.iteritems():
if calvinsys.can_read(mqtt):
message = calvinsys.read(mqtt)
# add client id to the message
message["client_id"] = client_id
self.queue.append(message)
if self.queue:
message = self.queue.pop(0)
return (message,)

"""
Update MQTT subscribed topics for specific MQTT client
"""

@condition(action_input=['client_id', 'uri', 'topic', 'qos'])
def update_topic(self, client_id, uri, topic, qos):
if (topic is None):
@condition(action_input=['client_id', 'uri', 'cmd', 'topic', 'qos'])
def update_topic(self, client_id, uri, cmd, topic, qos):
self._update_topic(client_id, uri, cmd, topic, qos)

def _update_topic(self, client_id, uri, cmd, topic, qos):
if not topic:
_log.warning("Topic is missing!")
return

if (not client_id in self.mqtt_dict.keys()):
if not client_id in self.mqtt_dict.keys():
self.mqtt_dict[client_id] = calvinsys.open(self, "mqtt.subscribe",
client_id=client_id,
topics=[topic],
uri=uri,
qos=qos,
**self.settings)
calvinsys.write(self.mqtt_dict[client_id], {"topic":topic, "qos":qos})
if self.mqtt_dict[client_id]:
self.layout[client_id] = {"uri": uri, "topics":{}}

success = calvinsys.write(self.mqtt_dict[client_id],
{"cmd": cmd, "topic":topic, "qos":qos})
exist = topic in dict(self.layout[client_id]["topics"]).iterkeys()
if success:
if cmd == self.CMD_SUBSCRIBE and not exist:
self.layout[client_id]["topics"][topic] = qos
elif cmd == self.CMD_UNSUBSCRIBE:
del self.layout[client_id]["topics"][topic]

action_priority = (update_topic, read_message)
requires = ['mqtt.subscribe']
27 changes: 17 additions & 10 deletions calvin/actorstore/tests/test_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def can_write(self, ref):

def write(self, ref, data):
obj = self._get_capability_object(ref)
obj.write(data)
return obj.write(data)

def can_read(self, ref):
obj = self._get_capability_object(ref)
Expand Down Expand Up @@ -104,6 +104,7 @@ def init_done(self, actor_name):
if obj:
obj.start_verifying_calvinsys()


def fwrite(port, value):
if isinstance(value, Token):
port.queue.write(value, None)
Expand All @@ -126,6 +127,7 @@ def pwrite(actor, portname, value):
# TODO make all actors' test compliant and change to raise exception
pass


def pread(actor, portname, number=1):
port = actor.outports.get(portname, None)
assert port
Expand All @@ -138,7 +140,7 @@ def pread(actor, portname, number=1):
available = -9999
raise AssertionError("Too few tokens available, %d, expected %d" % (available, number))
else:
if pavailable(actor, portname, number+1):
if pavailable(actor, portname, number + 1):
raise AssertionError("Too many tokens available, expected %d" % number)

values = [port.queue.peek(actor.id).value for _ in range(number)]
Expand Down Expand Up @@ -177,9 +179,10 @@ def __init__(self, port):
def is_connected(self):
return True


def setup_calvinlib():
import calvin.runtime.north.calvinlib as calvinlib
calvinlib.TESTING=True
calvinlib.TESTING = True
from calvin.runtime.north.calvinlib import get_calvinlib
lib = get_calvinlib()
lib.init(capabilities={
Expand Down Expand Up @@ -262,7 +265,6 @@ def setup_calvinsys():
"attributes": {"data": [False, True, False, True]}
},


"io.buzzer": {
"module": "mock.MockOutput",
"attributes": {}
Expand Down Expand Up @@ -410,16 +412,20 @@ def setup_calvinsys():
"weather.local": {
"module": "mock.MockInputOutput",
"attributes": {"data": ["dummy"]}
},
"mqtt.subscribe": {
"module": "web.mqtt.Subscribe",
"attributes": {}
}
})
return sys


def teardown_calvinlib():
import calvin.runtime.north.calvinlib as calvinlib
calvinlib.TESTING=False
calvinlib.TESTING = False
del calvinlib._calvinlib
calvinlib._calvinlib=None
calvinlib._calvinlib = None


def teardown_calvinsys():
Expand Down Expand Up @@ -523,7 +529,7 @@ def test_actor(self, actor, aut):
f(aut)
except Exception as e:
print "Actor %s failed during setup of test %d: %s" % (actor, test_index, e.message)
raise Exception("Failed during setup of test %d" % (test_index, ))
raise Exception("Failed during setup of test %d" % (test_index,))

for port, values in inputs.iteritems():
pwrite(aut, port, values)
Expand All @@ -547,7 +553,7 @@ def test_actor(self, actor, aut):
raise AssertionError("Failed test %d" % (test_index,))

if not all(f(aut) for f in postconds):
raise AssertionError("Failed post condition of test %d" % (test_index, ))
raise AssertionError("Failed post condition of test %d" % (test_index,))

return True

Expand All @@ -573,6 +579,7 @@ def test_actors(self):
return {'pass': test_pass, 'fail': test_fail, 'skipped': no_test,
'errors': self.illegal_actors, 'components': self.components}


def merge_results(result1, result2):
result = {}
for k in result1.keys():
Expand All @@ -588,7 +595,7 @@ def merge_results(result1, result2):
def show_result(header, result):
print header
for actor in result:
print " %s" % (actor, )
print " %s" % (actor,)


def show_issue(header, result):
Expand Down Expand Up @@ -640,7 +647,7 @@ def test_actors(actor="", show=False, path=None):
show_issues(results)

if results['errors']:
raise Exception("%d actor(s) had errors" % (len(results['errors']), ))
raise Exception("%d actor(s) had errors" % (len(results['errors']),))
if results['fail']:
raise Exception("%d actor(s) failed tests" % (len(results['fail']),))

Expand Down
6 changes: 3 additions & 3 deletions calvin/runtime/north/calvinsys.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def write(self, ref, data):
obj = self._get_capability_object(ref)
try:
validate(data, obj.write_schema)
obj.write(data)
return obj.write(data)
except Exception as e:
_log.exception("Failed to validate schema for {}.write(), exception={}".format(obj.__class__.__name__, e))

Expand Down Expand Up @@ -206,7 +206,7 @@ def open(self, capability_name, actor, **kwargs):
if len(csobjects) == 0:
idx = 0
else :
idx = int(csobjects[-1].rsplit('#', 1)[1])+1
idx = int(csobjects[-1].rsplit('#', 1)[1]) + 1

ref = "{}#{}".format(actor.id, idx)
self._objects[ref] = {"name": capability_name, "obj": obj, "args": kwargs}
Expand Down Expand Up @@ -236,7 +236,7 @@ def serialize(self, actor):
serz = {}
for ref in references:
csobj = self._objects.get(ref)
state = csobj["obj"].serialize() # serialize object
state = csobj["obj"].serialize() # serialize object
serz[ref] = {"name": csobj["name"], "obj": state, "args": csobj["args"]}
return serz

Expand Down
25 changes: 25 additions & 0 deletions calvin/tests/simple_migration_tests/mqtt_subscriber.calvin
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
define QOS = 1
define SETTINGS = {"clean_session": "False"}

client_id : std.Constantify(constant="dummy_id")
uri : std.Constantify(constant="mqtt://test.mosquitto.org:1883")
cmd : std.Trigger(data="subscribe", tick=1.0)
topic : std.Iterate()
topics : std.Constantify(constant=["$SYS/#"])
qos : std.Constantify(constant=1)
subscriber : net.MQTTDynamicSubscriber(settings=SETTINGS)
printer : io.Print()
terminator : flow.Terminator()

"dummy_id" > client_id.in
"mqtt://test.mosquitto.org:1883" > uri.in
["$SYS/#"] > topics.in
QOS > qos.in
client_id.out > subscriber.client_id
uri.out > subscriber.uri
topics.out > topic.token
topic.item > subscriber.topic
qos.out > subscriber.qos
cmd.data > subscriber.cmd
subscriber.message > printer.token
topic.index > terminator.void