forked from ankraft/onem2m-jupyter-notebooks
/
NotificationServer.py
351 lines (277 loc) · 12.3 KB
/
NotificationServer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
#
# notificationServer.py
#
# (c) 2020 by Andreas Kraft
# License: BSD 3-Clause License. See the LICENSE file for further details.
#
# Simple base implementation of a notification server to handle notifications
# from a CSE.
#
from __future__ import annotations
from http.client import HTTPMessage
from typing import cast
from http.server import HTTPServer, BaseHTTPRequestHandler
import email.parser
import json, argparse, sys, ssl, signal, time
import cbor2
from rich.console import Console
from rich.syntax import Syntax
import pathlib, os
parent = pathlib.Path(os.path.abspath(os.path.dirname(__file__))).parent
sys.path.append(f'{parent}/ACME')
from acme.helpers.MQTTConnection import MQTTConnection, MQTTHandler
from acme.helpers.TextTools import toHex
from acme.etc.RequestUtils import serializeData
from acme.etc.DateUtils import getResourceDate
from acme.etc.Types import ContentSerializationType
from acme.etc.Constants import Constants as C
##############################################################################
#
# HTTP Server
#
port = 9999 # Change this variable to specify another port.
messageColor = 'spring_green2'
messageColor2 = 'blue1'
errorColor = 'red'
failVerification = False
delayResponse:int = 0
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
""" Just provide a simple web page.
"""
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(bytes("<html><head><title>[ACME] Notification Server</title></head><body>This server doesn't provide a web page.</body></html>","utf-8"))
def do_POST(self) -> None:
""" Handle notification.
"""
_responseHeaders:list = []
# Get headers and content data
length = int(self.headers['Content-Length'])
contentType = self.headers['Content-Type']
requestID = self.headers['X-M2M-RI']
post_data = self.rfile.read(length)
# Print the content data
console.print(f'[{messageColor}]### Received Notification (http)')
console.print(self.headers, highlight = False)
# delay response
if delayResponse:
console.print(f'[{messageColor}]Delaying response by {delayResponse}s')
time.sleep(delayResponse)
# Construct return header
# Always acknowledge the verification requests
self.send_response(200)
self.send_header('X-M2M-RSC', '2000' if not failVerification else '4101')
self.send_header('X-M2M-RI', requestID)
_responseHeaders = self._headers_buffer # type:ignore [attr-defined]
self.end_headers()
# Print JSON
if contentType in [ 'application/json', 'application/vnd.onem2m-res+json' ]:
console.print(Syntax(json.dumps(json.loads(post_data.decode('utf-8')), indent=4),
'json',
theme='monokai',
line_numbers=False))
# Print CBOR
elif contentType in [ 'application/cbor', 'application/vnd.onem2m-res+cbor' ]:
console.print('[dim]Content as Hexdump:\n')
console.print(toHex(post_data), highlight=False)
console.print('\n[dim]Content as JSON:\n')
console.print(Syntax(json.dumps(cbor2.loads(post_data), indent=4),
'json',
theme='monokai',
line_numbers=False))
# Print other binary content
else:
console.print(toHex(post_data), highlight=False)
# Print HTTP Response
# This looks a it more complicated but is necessary to render nicely in Jupyter
console.print(f'[{messageColor2}]### Sent Notification Response (http)')
console.print(email.parser.Parser(_class = HTTPMessage).parsestr(b''.join(_responseHeaders).decode('iso-8859-1')), highlight = False)
def log_message(self, format:str, *args:int) -> None:
if (msg := format%args).startswith('"GET'): return # ignore GET log messages
console.print(f'[{messageColor} reverse]{msg}', highlight = False)
##############################################################################
#
# MQTT Client
#
mqttNotificationTopic = [ '/oneM2M/req/id-in/+/#' ]
class MQTTClientHandler(MQTTHandler):
def __init__(self, topic:str|list[str], enableLogging:bool) -> None:
super().__init__()
self.topic = topic
self.enableLogging = enableLogging
self.isShutdown = False
def onConnect(self, connection:MQTTConnection) -> bool:
try:
connection.subscribeTopic(self.topic, self._requestCB) # Subscribe to general requests
except Exception as e:
self.onError(connection, -1, f'MQTT {str(e)}')
return True
def onError(self, connection:MQTTConnection, rc:int=-1, msg:str=None) -> bool:
if rc in [5]: # authorization error
console.print(f'[{errorColor}]MQTT authorization error')
connection.shutdown()
console.print(f'[{messageColor}]MQTT client shutdown')
if rc == -1: # unknown. probably connection error?
if msg:
console.print(f'[{errorColor}]{msg}')
connection.shutdown()
console.print(f'[{messageColor}]MQTT client shutdown')
# ignore all others
return True
def logging(self, connection:MQTTConnection, level:int, message:str) -> bool:
if self.enableLogging:
console.print(f'{level}: {message}')
return True
def onShutdown(self, connection: MQTTConnection) -> None:
if not self.isShutdown:
exitAll()
def _requestCB(self, connection:MQTTConnection, topic:str, data:bytes) -> None:
# TODO is it actually a notification? no -> do nothing, yes -> reply
def _constructResponse(frm:str, to:str, jsn:dict) -> dict:
responseData = { 'fr': frm,
'to': to,
'rsc': 2000 if not failVerification else 4101,
'ot': getResourceDate()
}
if (rqi := jsn.get('rqi')):
responseData['rqi'] = rqi
if (rvi := jsn.get('rvi')):
responseData['rvi'] = rvi
return responseData
console.print(f'[{messageColor}]### Notification (MQTT)')
console.print(f'Topic: {topic}')
topicArray = topic.split('/')
if len(topicArray) > 4 and topicArray[-4] == 'req' and topicArray[-5] == 'oneM2M':
_frm = topicArray[-3]
_to = topicArray[-2]
encoding = topicArray[-1]
else:
_frm = 'non-onem2m-entity'
_to = 'unknown'
encoding = 'json'
# Print JSON
responseData = None
if encoding.upper() == 'JSON':
console.print(Syntax(json.dumps((jsn := json.loads(data)), indent=4),
'json',
theme='monokai',
line_numbers=False))
to = jsn['to'] if 'to' in jsn else _to
frm = jsn['fr'] if 'fr' in jsn else _frm
responseData = cast(bytes, serializeData(_constructResponse(to, frm, jsn), ContentSerializationType.JSON))
console.print(responseData)
# Print CBOR
elif encoding.upper() == 'CBOR':
console.print('[dim]Content as Hexdump:\n')
console.print(toHex(data), highlight=False)
console.print('\n[dim]Content as JSON:\n')
console.print(Syntax(json.dumps((jsn := cbor2.loads(data)), indent=4),
'json',
theme='monokai',
line_numbers=False))
to = jsn['to'] if 'to' in jsn else to
frm = jsn['fr'] if 'fr' in jsn else frm
responseData = cast(bytes, serializeData(_constructResponse(to, frm, jsn), ContentSerializationType.CBOR))
# Print other binary content
else:
console.print('[dim]Content as Hexdump:\n')
console.print(toHex(data), highlight=False)
# TODO send a response
if responseData:
# delay response
if delayResponse:
console.print(f'[{messageColor}]Delaying response by {delayResponse}s')
time.sleep(delayResponse)
# connection.publish(f'/oneM2M/resp{frm}/{to.lstrip("/").replace("/", ":")}/{encoding}', responseData)
connection.publish(f'/oneM2M/resp/{_frm}/{_to}/{encoding}', responseData)
class MQTTClient(object):
def __init__(self, args:argparse.Namespace) -> None:
# Assume a couple of meaningful defaults here
self.mqttConnection = MQTTConnection(address = args.mqttAddress,
port = args.mqttPort,
keepalive = 60,
interface = '0.0.0.0',
username = args.mqttUsername,
password = args.mqttPassword,
messageHandler = MQTTClientHandler (args.mqttTopic, args.mqttLogging))
def run(self) -> None:
self.mqttConnection.run()
console.print(f'[{messageColor}]MQTT client started. Connected to {args.mqttAddress}:{args.mqttPort}.')
def shutdown(self) -> None:
self.mqttConnection.messageHandler.isShutdown = True # type:ignore [attr-defined]
if self.mqttConnection:
self.mqttConnection.shutdown()
console.print(f'[{messageColor}]MQTT client stopped.')
##############################################################################
#
# Help with exiting and terminating all the threads programmatically
#
class ExitCommand(Exception):
pass
def exitSignalHandler(signal, frame) -> None: # type: ignore [no-untyped-def]
raise ExitCommand()
def exitAll() -> None:
os.kill(os.getpid(), signal.SIGTERM)
def checkPositive(value:str) -> int:
ivalue = int(value)
if ivalue <= 0:
raise argparse.ArgumentTypeError(f'{value} is an invalid positive int value')
return ivalue
signal.signal(signal.SIGTERM, exitSignalHandler)
if __name__ == '__main__':
console = Console()
console.print(f'\n{C.textLogo} - [bold]Notification Server[/bold]\n\n')
# parse command line argiments
parser = argparse.ArgumentParser()
# mutual exlcusive arguments for http
groupHttp = parser.add_mutually_exclusive_group()
groupHttp.add_argument('--http', action='store_false', dest='usehttps', default=None, help='run as http server (the default)')
groupHttp.add_argument('--https', action='store_true', dest='usehttps', default=None, help='run as https server')
parser.add_argument('--port', action='store', dest='port', default=port, type=int, help='specify the server port')
parser.add_argument('--certfile', action='store', dest='certfile', required='--https' in sys.argv, metavar='<filename>', help='specify the certificate file for https')
parser.add_argument('--keyfile', action='store', dest='keyfile', required='--https' in sys.argv, metavar='<filename>', help='specify the key file for https')
# MQTT arguments
parser.add_argument('--mqtt', action='store_true', dest='mqtt', default=False, help='enable MQTT for notifications')
parser.add_argument('--mqtt-address', action='store', dest='mqttAddress', default='localhost', required='--mqtt' in sys.argv, metavar='<host>', help='MQTT broker address (default: localhost)')
parser.add_argument('--mqtt-port', action='store', dest='mqttPort', default=1883, metavar='<port>', help='MQTT broker port (default: 1883)')
parser.add_argument('--mqtt-topic', action='store', dest='mqttTopic', default=mqttNotificationTopic, metavar='<topic>', nargs='+', help=f'MQTT topic list (default: {mqttNotificationTopic})')
parser.add_argument('--mqtt-username', action='store', dest='mqttUsername', default=None, metavar='<username>', help='MQTT username (default: None)')
parser.add_argument('--mqtt-password', action='store', dest='mqttPassword', default=None, required='--mqttUsername' in sys.argv, metavar='<password>', help='MQTT password (default: None)')
parser.add_argument('--mqtt-logging', action='store_true', dest='mqttLogging', default=False, help='MQTT enable logging (default: disabled)')
# Generic
parser.add_argument('--fail-verification', action='store_true', dest='failVerification', default=False, help='Fail verification requests (default: false)')
parser.add_argument('--delay-response', action='store', dest='delayResponse', nargs='?', const=60, type=checkPositive, help='Delay response (default: 60s)')
args = parser.parse_args()
# Generic arguments
failVerification = args.failVerification
delayResponse = args.delayResponse
# run http(s) server
httpd = HTTPServer(('', args.port), SimpleHTTPRequestHandler)
if args.usehttps:
# init ssl socket
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) # Create a SSL Context
context.load_cert_chain(args.certfile, args.keyfile) # Load the certificate and private key
httpd.socket = context.wrap_socket(httpd.socket, server_side=True) # wrap the original http server socket as an SSL/TLS socket
console.print(f'[{messageColor}]Notification server started.')
try:
# run mqtt client
mqttClient = None
if args.mqtt:
mqttClient = MQTTClient(args)
mqttClient.run()
# run http server
console.print(f'[{messageColor}]Listening for http(s) connections on port {args.port}.')
httpd.serve_forever()
except KeyboardInterrupt as e:
if mqttClient:
mqttClient.shutdown()
# The http server is stopped implicitly
except ExitCommand:
pass
except Exception:
console.print()
console.print_exception()
finally:
console.print(f'[{messageColor}]Notification server stopped.')