1
+ """
2
+ simple mqtt console with textual and aiomqtt
3
+ """
4
+ from aiomqtt import Client
5
+ from textual import work , on
6
+ from textual .app import App , ComposeResult
7
+ from textual .containers import Horizontal
8
+ from textual .widgets import Header , Footer , RichLog , Input
9
+ from textual .binding import Binding
10
+
11
+ try :
12
+ from config import MQTT_HOST , MQTT_PORT , CLIENT_ID , MQTT_USER , MQTT_PW
13
+ except ModuleNotFoundError as _ :
14
+ MQTT_HOST = 'fill in your mqtt host here'
15
+ MQTT_PORT = 'add your mqtt port here'
16
+ CLIENT_ID = 'put your client id here'
17
+ # also set user and password if mqtt server needs it
18
+ MQTT_USER = None
19
+ MQTT_PW = None
20
+
21
+ class MQTTConsole (App ):
22
+ """a simple mqtt console"""
23
+ TITLE = "MQTT Console"
24
+ BINDINGS = [Binding (key = "q" , action = "quit_mqtt_console" , description = "Quit App" ),
25
+ Binding (key = "c" , action = "clear_mqtt_console" , description = "Clear Console" ),]
26
+ #CSS_PATH = "console-tui.tcss"
27
+
28
+ client = None
29
+ # the topic you wanna publish to
30
+ topic = f"textualize/rules"
31
+
32
+ def compose (self ) -> ComposeResult :
33
+ yield Header (name = self .TITLE , show_clock = False )
34
+ with Horizontal ():
35
+ yield Input (placeholder = f"Publish mqtt topic on { self .topic } " )
36
+ yield RichLog ()
37
+ yield Footer ()
38
+
39
+ def on_mount (self ):
40
+ self .mqttWorker ()
41
+
42
+ @on (Input .Submitted )
43
+ async def input_changed (self , message : Input .Changed ) -> None :
44
+ await self .client .publish (self .topic , f"{ message .value } " )
45
+
46
+ @work (exclusive = False )
47
+ async def mqttWorker (self ):
48
+ async with Client (MQTT_HOST , port = MQTT_PORT , identifier = CLIENT_ID , username = MQTT_USER , password = MQTT_PW ) as self .client :
49
+ ## subscribe to the topic you also publishing
50
+ await self .client .subscribe (self .topic )
51
+ ## tasmota plugs
52
+ await self .client .subscribe ("tele/#" )
53
+ #await self.client.subscribe("tasmota/discovery/#")
54
+ ## subscribe to all
55
+ #await self.client.subscribe("#")
56
+
57
+ async for message in self .client .messages :
58
+ t = message .topic .value
59
+ try :
60
+ msg = message .payload .decode ('utf-8' )
61
+ except UnicodeDecodeError as _ :
62
+ msg = "couldn't decode message"
63
+ self .query_one (RichLog ).write (f"{ t } , { msg } " )
64
+
65
+ def action_clear_mqtt_console (self ) -> None :
66
+ self .query_one (RichLog ).clear ()
67
+
68
+ def action_quit_mqtt_console (self ) -> None :
69
+ self .app .exit ()
70
+
71
+ if __name__ == "__main__" :
72
+ app = MQTTConsole ()
73
+ app .run ()
0 commit comments