Skip to content

Commit

Permalink
Merge pull request #9 from sergeymaysak/cloud_push
Browse files Browse the repository at this point in the history
Introduced cloud push.
  • Loading branch information
sergeymaysak committed May 22, 2021
2 parents 28b858e + 6fabf46 commit 8b14684
Show file tree
Hide file tree
Showing 10 changed files with 483 additions and 44 deletions.
22 changes: 22 additions & 0 deletions README.md
Expand Up @@ -66,6 +66,25 @@ succeed = api.install_push_notification(1, notifications, False)
# if you have multiple tag managers you need specify its 'mac' stored in each tag as following
succeed = api.install_push_notification(sensor.tag_id, notifications, False,
sensor.tag_manager_mac)

```

## Monitor push events from cloud

Wireless Sensor Tags platform allows to monitor state of sensors with push notification on change right from cloud.

```python

import wirelesstagspy

api = wirelesstagpy.WirelessTags(username='login_email', password='your_password')
def callback(sensors):
print("updated sensors: {}".format(sensors))

# starts long running thread with getting updates from cloud immidiately when change happens.
# it is not a polling, but rather a similar to WebSockets get update logic
api.start_monitoring(callback)

```

## Arm/Disarm sensor monitoring for specific event
Expand Down Expand Up @@ -99,6 +118,7 @@ You can get list of supported sensors and binary events by calling `tag.allowed_
Also you can query tag on list of supported monitoring conditions that can be represented as switches to be armed/disarmed by calling `tag.allowed_monitoring_types`.

Handling sensors:

```python

import wirelesstagpy
Expand All @@ -116,6 +136,7 @@ for (uuid, tag) in tags.items():
```

Handling binary events:

```python

import wirelesstagpy
Expand All @@ -133,6 +154,7 @@ for (uuid, tag) in tags.items():
```

Use binary events to build custom push notifications configurations

```python

import wirelesstagpy
Expand Down
25 changes: 25 additions & 0 deletions playground.py
@@ -0,0 +1,25 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-

"""WirelessTags examples runner."""
import asyncio
import wirelesstagpy

loop = asyncio.get_event_loop()
platform = wirelesstagpy.WirelessTags(username='email', password='passwd')

def callback(tags, events):
"""Callback on cloud push update."""
print("updated tags: {}, triggered events: {}".format(tags, events))

platform.start_monitoring(callback)
print("started push updates")

try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
platform.stop_monitoring()
print('stopped monitoring')
loop.close()
4 changes: 3 additions & 1 deletion requirements_test.txt
Expand Up @@ -10,4 +10,6 @@ pytest-sugar>=0.8.0
pytest-timeout>=1.0.0
restructuredtext-lint>=1.0.1
pygments>=2.2.0
requests_mock>=1.3.0
requests_mock>=1.3.0
tl.testing==0.5
coverage==5.5
63 changes: 63 additions & 0 deletions test/mock/__init__.py
Expand Up @@ -552,3 +552,66 @@
"disabled": False,
"nat": True
}

CLOUD_PUSH_UPDATE_RESPONSE = '''<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<soap:Body>
<GetNextUpdateForAllManagersOnDB2Response xmlns="http://mytaglist.com/ethComet">
<GetNextUpdateForAllManagersOnDB2Result>[{"dbid":2,"notificationJS":null,"name":"Weather Station","uuid":"fake-1111-2222-4444-111111111111","comment":"","slaveId":11,"tagType":32,"discon":null,"lastComm":132655886150468311,"alive":true,"signaldBm":-76,"batteryVolt":3.1116966701471327,"beeping":false,"lit":false,"migrationPending":false,"beepDurationDefault":5,"eventState":3,"tempEventState":1,"OutOfRange":false,"tempSpurTh":28,"lux":0,"temperature":18.036840438842773,"tempCalOffset":0,"capCalOffset":0,"image_md5":null,"cap":41.00469970703125,"capRaw":0,"az2":0,"capEventState":2,"lightEventState":3,"shorted":false,"zmod":null,"thermostat":null,"playback":null,"postBackInterval":600,"rev":159,"version1":2,"freqOffset":-9,"freqCalApplied":6595,"reviveEvery":4,"oorGrace":2,"tempBL":null,"capBL":null,"luxBL":null,"LBTh":2.67,"enLBN":true,"txpwr":26,"rssiMode":true,"ds18":false,"v2flag":16,"batteryRemaining":0.99}]</GetNextUpdateForAllManagersOnDB2Result>
</GetNextUpdateForAllManagersOnDB2Response>
</soap:Body>
</soap:Envelope>'''

BEFORE_CLOUD_PUSH_SENSOR_INFO = {
"dbid": 2,
"notificationJS": None,
"name": "Weather Station",
"uuid": "fake-1111-2222-4444-111111111111",
"comment": "",
"slaveId": 11,
"tagType": 32,
"discon": None,
"lastComm": 132655886150468311,
"alive": True,
"signaldBm": -76,
"batteryVolt": 2,
"beeping": False,
"lit": False,
"migrationPending": False,
"beepDurationDefault": 5,
"eventState": 0,
"tempEventState": 2,
"OutOfRange": True,
"tempSpurTh": 28,
"lux": 0,
"temperature": 18.036840438842773,
"tempCalOffset": 0,
"capCalOffset": 0,
"image_md5": None,
"cap": 0,
"capRaw": 0,
"az2": 0,
"capEventState": 3,
"lightEventState": 2,
"shorted": False,
"zmod": None,
"thermostat": None,
"playback": None,
"postBackInterval": 600,
"rev": 159,
"version1": 2,
"freqOffset": -9,
"freqCalApplied": 6595,
"reviveEvery": 4,
"oorGrace": 2,
"tempBL": None,
"capBL": None,
"luxBL": None,
"LBTh": 2.67,
"enLBN": True,
"txpwr": 26,
"rssiMode": True,
"ds18": False,
"v2flag": 16,
"batteryRemaining": 0.99
}
110 changes: 110 additions & 0 deletions test/test_cloud_push.py
@@ -0,0 +1,110 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-

"""Module with tests for cloud push API in wirelesstags platform."""

import json
from xml.etree import ElementTree

import test.mock as MOCK
import requests_mock
import tl.testing.thread

import wirelesstagpy
import wirelesstagpy.constants as CONST
from wirelesstagpy.binaryevent import (
BinaryEvent
)

USERNAME = 'foobar'
PASSWORD = 'deadbeef'


class TestCloudPush(tl.testing.thread.ThreadAwareTestCase):
"""Tests for Cloud Push logic."""

def setUp(self):
"""Set up wirelesstags platform module."""
self.platform = wirelesstagpy.WirelessTags(username=USERNAME, password=PASSWORD)

def tearDown(self):
"""Clean up after each test."""
self.platform = None

@requests_mock.mock()
def test_cloud_push(self, m):
"""Test cloud push logic."""
m.post(CONST.SIGN_IN_URL, text=MOCK.LOGIN_RESPONSE)
m.post(CONST.IS_SIGNED_IN_URL, text=MOCK.LOGIN_RESPONSE)
m.post(CONST.REQUEST_CLOUD_PUSH_UPDATE_URL, text=MOCK.CLOUD_PUSH_UPDATE_RESPONSE)

local_platform = self.platform

def push_callback(tags, events):
"""Local push callback."""
self.assertTrue(local_platform.is_monitoring)
self.assertTrue(len(tags) == 1)
self.assertTrue(len(events) == 0)
local_platform.stop_monitoring()
self.assertFalse(local_platform.is_monitoring)

with tl.testing.thread.ThreadJoiner(1):
self.platform.start_monitoring(push_callback)
# try to run it again
if self.platform.is_monitoring is True:
self.platform.start_monitoring(push_callback)
self.assertFalse(self.platform.is_monitoring)
self.platform.stop_monitoring()
self.assertFalse(self.platform.is_monitoring)

@requests_mock.mock()
def test_cloud_push_failed(self, m):
"""Test cloud push logic."""
m.post(CONST.SIGN_IN_URL, text=MOCK.LOGIN_RESPONSE)
m.post(CONST.IS_SIGNED_IN_URL, text=MOCK.LOGIN_RESPONSE)
m.post(CONST.REQUEST_CLOUD_PUSH_UPDATE_URL, status_code=500)

def push_callback(tags, events):
pass

with tl.testing.thread.ThreadJoiner(1):
self.platform.start_monitoring(push_callback)
self.platform.stop_monitoring()
self.assertFalse(self.platform.is_monitoring)

@requests_mock.mock()
def test_binary_event_arrived(self, m):
"""Test binary event."""
m.post(CONST.SIGN_IN_URL, text=MOCK.LOGIN_RESPONSE)
m.post(CONST.IS_SIGNED_IN_URL, text=MOCK.LOGIN_RESPONSE)
m.post(CONST.REQUEST_CLOUD_PUSH_UPDATE_URL, text=MOCK.CLOUD_PUSH_UPDATE_RESPONSE)

tag = wirelesstagpy.SensorTag(MOCK.BEFORE_CLOUD_PUSH_SENSOR_INFO, self.platform)
self.platform._tags["fake-1111-2222-4444-111111111111"] = tag # pylint: disable=protected-access
binary_event = BinaryEvent.make_state_event(tag)
self.assertIsNone(binary_event)

local_platform = self.platform

def push_callback(tags, events):
self.assertTrue(len(events) == 1)
arrived_events = events["fake-1111-2222-4444-111111111111"]
self.assertTrue(len(arrived_events) == 7)
event = arrived_events[0]
self.assertIsNotNone(str(event))
local_platform.stop_monitoring()

with tl.testing.thread.ThreadJoiner(1):
self.platform.start_monitoring(push_callback)

def test_soup_parsing(self):
"""Test for parsing arrived soap payload on cloud push."""
root = ElementTree.fromstring(MOCK.CLOUD_PUSH_UPDATE_RESPONSE)
raw_tags = root.find(CONST.CLOUD_PUSH_XPATH)
tags = json.loads(raw_tags.text)
self.assertTrue(len(tags) == 1)

def test_binary_event_factory(self):
"""Test binary event factory methods."""
event = BinaryEvent.make_event(None, None)
self.assertIsNone(event)
94 changes: 86 additions & 8 deletions wirelesstagpy/__init__.py
Expand Up @@ -18,14 +18,17 @@
see www.wirelesstag.net for more information.
I am in no way affiliated with Cao Gadgets LLC.
Copyrights: (c) 2018 Sergiy Maysak, see LICENSE file for details
Copyrights: (c) 2018-2021 Sergiy Maysak, see LICENSE file for details
Creation Date: 3/7/2018.
"""

import json
import time
from datetime import datetime
import logging
from xml.etree import ElementTree
from threading import Thread
from threading import Lock
import requests

from wirelesstagpy.sensortag import SensorTag
Expand Down Expand Up @@ -65,6 +68,10 @@ def __init__(self, username, password):
# array of tag managers mac addresses
self.mac_addresses = []

self._is_cloud_push_active = False
self._update_lock = Lock()
self._thread = None

def load_tags(self):
"""Load all registered tags."""
if self._needs_reload:
Expand All @@ -78,12 +85,7 @@ def load_tags(self):

json_tags_spec = response.json()
tags = json_tags_spec['d']
for tag in tags:
uuid = tag['uuid']
# save mac - a unique identifier of specific tag manager
mac = tag['mac'] if 'mac' in tag else None
self._tags[uuid] = SensorTag(tag, self, mac)
self._register_mac(mac)
self._update_tags(tags)
_LOGGER.info("Tags reloaded at: %s", datetime.now())
except Exception as error:
_LOGGER.error("failed to load tags - %s", error)
Expand Down Expand Up @@ -182,6 +184,82 @@ def fetch_push_notifications(self, tag_id, tag_manager_mac=None):
_LOGGER.error("failed to fetch : %s - %s", tag_id, error)
return notifications

@property
def is_monitoring(self):
"""Return if cloud push monitoring is active or not."""
return self._is_cloud_push_active

def start_monitoring(self, handler):
"""Start monitoring of state changes with push from cloud."""
if self._is_cloud_push_active is True:
return

self._is_cloud_push_active = True
self._start_monitoring_thread(handler)

def stop_monitoring(self):
"""Stop monitoring of state changes from cloud."""
if self._is_cloud_push_active is False:
return

self._is_cloud_push_active = False
self._thread.join(timeout=1)

def _start_monitoring_thread(self, handler):
"""Start working thread for monitoring cloud push."""
def _cloud_push_worker(handler):
"""Worker function for thread."""
while True:
if not self._is_cloud_push_active:
break
try:
tags_updated = self._request_push_update()
if tags_updated is not None and len(tags_updated) > 0:
updated_tags, events = self._update_tags(tags_updated)
handler(updated_tags, events)
except Exception as error:
_LOGGER.error("failed to get cloud push: %s",
error)

self._thread = Thread(target=_cloud_push_worker, args=(handler, ))
self._thread.start()

def _request_push_update(self):
"""Request push update for tags states."""
cookies = self._auth_cookies
tags_updated = []
try:
payload = CONST.SOAP_CLOUD_PUSH_PAYLOAD
headers = CONST.SOAP_CLOUD_PUSH_HEADERS
response = requests.post(
CONST.REQUEST_CLOUD_PUSH_UPDATE_URL, headers=headers,
cookies=cookies, data=payload)
root = ElementTree.fromstring(response.content)
raw_tags = root.find(CONST.CLOUD_PUSH_XPATH)
tags_updated = json.loads(raw_tags.text)
except Exception as error:
_LOGGER.error("failed to fetch push update: %s", error)
return tags_updated

def _update_tags(self, tags):
"""Update tags arrived from server."""
updated_tags = {}
binary_events = {}
with self._update_lock:
for tag in tags:
uuid = tag['uuid']
# save mac - a unique identifier of specific tag manager
mac = tag['mac'] if 'mac' in tag else None
new_tag = SensorTag(tag, self, mac)
old_tag = self._tags[uuid] if uuid in self._tags else None
detected_events = new_tag.detected_events(old_tag)
if detected_events is not None and len(detected_events) > 0:
binary_events[uuid] = detected_events
self._tags[uuid] = new_tag
self._register_mac(mac)
updated_tags[uuid] = new_tag
return (updated_tags, binary_events)

def _arm_control_tag(self, tag_id, url, tag_manager_mac=None, own_payload=None):
"""Arm sensor with specified id and url to monitor changes."""
cookies = self._auth_cookies
Expand Down Expand Up @@ -226,7 +304,7 @@ def _login(self):
_LOGGER.debug("Failed to login to %s - %s", CONST.BASEURL, error)
self._cookies = None
raise WirelessTagsException("Unable to login to wirelesstags.net"
" - check your credentials")
" - check your credentials") from error

_LOGGER.info("Login successful")

Expand Down

0 comments on commit 8b14684

Please sign in to comment.