Files
nspanel-lovelace-ui/nspanel-lovelace-ui/rootfs/usr/bin/mqtt-manager/mqtt.py
2023-12-01 19:55:29 +01:00

68 lines
2.9 KiB
Python

from uuid import getnode as get_mac
import paho.mqtt.client as mqtt
import logging
import time
import json
import threading
class MqttManager:
def __init__(self, settings, msg_in_queue, msg_out_queue_list):
mqtt_client_name = "NSPanelLovelaceManager_" + str(get_mac())
self.client = mqtt.Client(mqtt_client_name)
self.msg_in_queue = msg_in_queue
self.msg_out_queue_list = msg_out_queue_list
self.settings = settings
self.client.on_connect = self.on_mqtt_connect
self.client.on_message = self.on_mqtt_message
self.client.username_pw_set(
settings["mqtt_username"], settings["mqtt_password"])
# Wait for connection
connection_return_code = 0
mqtt_server = settings["mqtt_server"]
mqtt_port = int(settings["mqtt_port"])
logging.info("Connecting to %s:%i as %s",
mqtt_server, mqtt_port, mqtt_client_name)
while True:
try:
self.client.connect(mqtt_server, mqtt_port, 5)
break # Connection call did not raise exception, connection is sucessfull
except: # pylint: disable=bare-except
logging.exception(
"Failed to connect to MQTT %s:%i. Will try again in 10 seconds. Code: %s", mqtt_server, mqtt_port, connection_return_code)
time.sleep(10.)
self.client.loop_start()
process_thread = threading.Thread(target=self.process_in_queue, args=(self.client, self.msg_in_queue))
process_thread.daemon = True
process_thread.start()
def on_mqtt_connect(self, client, userdata, flags, rc):
logging.info("Connected to MQTT Server")
# subscribe to panelRecvTopic of each panel
for settings_panel in self.settings["nspanels"].values():
client.subscribe(settings_panel["panelRecvTopic"])
def on_mqtt_message(self, client, userdata, msg):
try:
if msg.payload.decode() == "":
return
if msg.topic in self.msg_out_queue_list.keys():
data = json.loads(msg.payload.decode('utf-8'))
if "CustomRecv" in data:
queue = self.msg_out_queue_list[msg.topic]
queue.put(("MQTT:", data["CustomRecv"]))
else:
logging.debug("Received unhandled message on topic: %s", msg.topic)
except Exception: # pylint: disable=broad-exception-caught
logging.exception("Something went wrong during processing of message:")
try:
logging.error(msg.payload.decode('utf-8'))
except: # pylint: disable=bare-except
logging.error(
"Something went wrong when processing the exception message, couldn't decode payload to utf-8.")
def process_in_queue(self, client, msg_in_queue):
while True:
msg = msg_in_queue.get()
client.publish(msg[0], msg[1])