added error handling and improved logging

This commit is contained in:
Johannes Braun
2026-02-21 18:44:23 +01:00
parent 42c27a3794
commit cf396b0259
6 changed files with 330 additions and 215 deletions

View File

@@ -475,7 +475,7 @@ class AlarmCard(HACard):
main_entity = self.entities[0]
main_entity.render()
print(main_entity.state)
logging.debug("Alarm card state for '%s': %s", main_entity.entity_id, main_entity.state)
icon = get_icon_char("shield-off")
color = rgb_dec565([255,255,255])

View File

@@ -8,6 +8,8 @@ def wait_for_ha_cache():
while time.time() < mustend:
if len(libs.home_assistant.home_assistant_entity_state_cache) == 0:
time.sleep(0.1)
if len(libs.home_assistant.home_assistant_entity_state_cache) == 0:
logging.warning("Home Assistant entity cache is still empty after waiting 5 seconds")
time.sleep(1)
def calculate_dim_values(sleepTracking, sleepTrackingZones, sleepBrightness, screenBrightness, sleepOverride, return_involved_entities=False):
@@ -28,8 +30,8 @@ def calculate_dim_values(sleepTracking, sleepTrackingZones, sleepBrightness, scr
involved_entities.append(sleepBrightness)
try:
dimmode = int(float(libs.home_assistant.get_entity_data(sleepBrightness).get('state', 10)))
except ValueError:
print("sleepBrightness entity invalid")
except (TypeError, ValueError):
logging.exception("sleepBrightness entity '%s' has an invalid state value", sleepBrightness)
if screenBrightness:
if isinstance(screenBrightness, int):
@@ -44,8 +46,8 @@ def calculate_dim_values(sleepTracking, sleepTrackingZones, sleepBrightness, scr
involved_entities.append(screenBrightness)
try:
dimValueNormal = int(float(libs.home_assistant.get_entity_data(screenBrightness).get('state', 100)))
except ValueError:
print("screenBrightness entity invalid")
except (TypeError, ValueError):
logging.exception("screenBrightness entity '%s' has an invalid state value", screenBrightness)
# force sleep brightness to zero in case sleepTracking is active
if sleepTracking:
if libs.home_assistant.is_existent(sleepTracking):
@@ -237,12 +239,19 @@ def handle_buttons(entity_id, btype, value, entity_config=None):
def call_ha_service(entity_id, service, service_data = {}):
etype = entity_id.split(".")[0]
libs.home_assistant.call_service(
ok = libs.home_assistant.call_service(
entity_name=entity_id,
domain=etype,
service=service,
service_data=service_data
)
if not ok:
logging.error(
"Home Assistant service call failed: entity='%s', service='%s', data=%s",
entity_id,
service,
service_data,
)
def button_press(entity_id, value):
etype = entity_id.split(".")[0]

View File

@@ -14,8 +14,9 @@ next_id = 0
request_all_states_id = 0
ws_connected = False
home_assistant_entity_state_cache = {}
template_cache = {}
response_buffer = {}
template_cache = {}
response_buffer = {}
nspanel_event_handler = None
ON_CONNECT_HANDLER = None
@@ -44,47 +45,64 @@ def register_on_disconnect_handler(handler):
ON_DISCONNECT_HANDLER = handler
def on_message(ws, message):
global auth_ok, request_all_states_id, home_assistant_entity_state_cache, response_buffer, template_cache
json_msg = json.loads(message)
if json_msg["type"] == "auth_required":
authenticate_client()
elif json_msg["type"] == "auth_ok":
auth_ok = True
logging.info("Home Assistant auth OK. Requesting existing states.")
subscribe_to_events()
_get_all_states()
if ON_CONNECT_HANDLER is not None:
ON_CONNECT_HANDLER()
# for templates
elif json_msg["type"] == "event" and json_msg["id"] in response_buffer:
template_cache[response_buffer[json_msg["id"]]] = {
"result": json_msg["event"]["result"],
"listener-entities": json_msg["event"]["listeners"]["entities"]
}
elif json_msg["type"] == "event" and json_msg["event"]["event_type"] == "state_changed":
entity_id = json_msg["event"]["data"]["entity_id"]
home_assistant_entity_state_cache[entity_id] = json_msg["event"]["data"]["new_state"]
send_entity_update(entity_id)
# rerender template
for template, template_cache_entry in template_cache.items():
if entity_id in template_cache_entry.get("listener-entities", []):
cache_template(template)
elif json_msg["type"] == "event" and json_msg["event"]["event_type"] == "esphome.nspanel.data":
nspanel_data_callback(json_msg["event"]["data"]["device_id"], json_msg["event"]["data"]["CustomRecv"])
elif json_msg["type"] == "result" and not json_msg["success"]:
logging.error("Failed result: ")
logging.error(json_msg)
elif json_msg["type"] == "result" and json_msg["success"]:
if json_msg["id"] == request_all_states_id:
for entity in json_msg["result"]:
home_assistant_entity_state_cache[entity["entity_id"]] = entity
else:
if json_msg["id"] in response_buffer and json_msg.get("result"):
response_buffer[json_msg["id"]] = json_msg["result"]
return None # Ignore success result messages
else:
logging.debug(message)
def on_message(ws, message):
global auth_ok, request_all_states_id, home_assistant_entity_state_cache, response_buffer, template_cache
try:
json_msg = json.loads(message)
except json.JSONDecodeError:
logging.exception("Failed to parse Home Assistant websocket message as JSON")
return
message_type = json_msg.get("type")
if message_type == "auth_required":
authenticate_client()
elif message_type == "auth_ok":
auth_ok = True
logging.info("Home Assistant auth OK. Requesting existing states.")
subscribe_to_events()
_get_all_states()
if ON_CONNECT_HANDLER is not None:
ON_CONNECT_HANDLER()
# for templates
elif message_type == "event" and json_msg.get("id") in response_buffer:
event = json_msg.get("event", {})
listeners = event.get("listeners", {})
template_cache[response_buffer[json_msg["id"]]] = {
"result": event.get("result"),
"listener-entities": listeners.get("entities", [])
}
elif message_type == "event" and json_msg.get("event", {}).get("event_type") == "state_changed":
event_data = json_msg.get("event", {}).get("data", {})
entity_id = event_data.get("entity_id")
if not entity_id:
logging.debug("Received state_changed event without entity_id")
return
home_assistant_entity_state_cache[entity_id] = event_data.get("new_state")
send_entity_update(entity_id)
# rerender template
for template, template_cache_entry in template_cache.items():
if entity_id in template_cache_entry.get("listener-entities", []):
cache_template(template)
elif message_type == "event" and json_msg.get("event", {}).get("event_type") == "esphome.nspanel.data":
event_data = json_msg.get("event", {}).get("data", {})
device_id = event_data.get("device_id")
custom_recv = event_data.get("CustomRecv")
if nspanel_event_handler is None:
logging.debug("No NsPanel event handler registered; dropping event for device '%s'", device_id)
return
nspanel_event_handler(device_id, custom_recv)
elif message_type == "result" and not json_msg.get("success"):
logging.error("Home Assistant request failed: %s", json_msg)
elif message_type == "result" and json_msg.get("success"):
if json_msg.get("id") == request_all_states_id:
for entity in json_msg.get("result", []):
home_assistant_entity_state_cache[entity["entity_id"]] = entity
else:
if json_msg.get("id") in response_buffer and json_msg.get("result"):
response_buffer[json_msg["id"]] = json_msg["result"]
return None # Ignore success result messages
else:
logging.debug(message)
def _ws_connection_open(ws):
@@ -95,20 +113,24 @@ def _ws_connection_open(ws):
ON_CONNECT_HANDLER()
def _ws_connection_close(ws, close_status_code, close_msg):
global ws_connected
ws_connected = False
logging.error("WebSocket connection closed!")
if ON_DISCONNECT_HANDLER is not None:
ON_DISCONNECT_HANDLER()
def _ws_connection_close(ws, close_status_code, close_msg):
global ws_connected
ws_connected = False
logging.error(
"WebSocket connection closed (status=%s, message=%s)",
close_status_code,
close_msg,
)
if ON_DISCONNECT_HANDLER is not None:
ON_DISCONNECT_HANDLER()
def connect():
Thread(target=_do_connection, daemon=True).start()
def _do_connection():
global home_assistant_url, ws, settings
def _do_connection():
global home_assistant_url, ws, settings
ws_url = home_assistant_url.replace(
"https://", "wss://").replace("http://", "ws://")
if settings["is_addon"]:
@@ -117,12 +139,15 @@ def _do_connection():
ws_url += "/api/websocket"
ws = websocket.WebSocketApp(F"{ws_url}", on_message=on_message,
on_open=_ws_connection_open, on_close=_ws_connection_close)
while True:
logging.info(F"Connecting to Home Assistant at {ws_url}")
ws.close()
time.sleep(1)
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
time.sleep(10)
while True:
logging.info(F"Connecting to Home Assistant at {ws_url}")
try:
ws.close()
time.sleep(1)
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
except Exception:
logging.exception("WebSocket connection loop failed")
time.sleep(10)
def authenticate_client():
@@ -144,9 +169,9 @@ def subscribe_to_events():
}
send_message(json.dumps(msg))
def subscribe_to_nspanel_events(nsp_callback):
global next_id, nspanel_data_callback
nspanel_data_callback = nsp_callback
def subscribe_to_nspanel_events(nsp_callback):
global next_id, nspanel_event_handler
nspanel_event_handler = nsp_callback
msg = {
"id": next_id,
"type": "subscribe_events",
@@ -168,11 +193,13 @@ def send_entity_update(entity_id):
global on_ha_update
on_ha_update(entity_id)
def nspanel_data_callback(device_id, msg):
global nspanel_data_callback
nspanel_data_callback(device_id, msg)
def nspanel_data_callback(device_id, msg):
if nspanel_event_handler is None:
logging.debug("NsPanel callback invoked before handler was registered")
return
nspanel_event_handler(device_id, msg)
def call_service(entity_name: str, domain: str, service: str, service_data: dict) -> bool:
def call_service(entity_name: str, domain: str, service: str, service_data: dict) -> bool:
global next_id
try:
msg = {
@@ -187,9 +214,12 @@ def call_service(entity_name: str, domain: str, service: str, service_data: dict
}
send_message(json.dumps(msg))
return True
except Exception as e:
logging.exception("Failed to call Home Assisatant service.")
return False
except Exception:
logging.exception(
"Failed to call Home Assistant service: %s.%s for %s",
domain, service, entity_name
)
return False
def send_msg_to_panel(service: str, service_data: dict) -> bool:
global next_id
@@ -203,9 +233,9 @@ def send_msg_to_panel(service: str, service_data: dict) -> bool:
}
send_message(json.dumps(msg))
return True
except Exception as e:
logging.exception("Failed to call Home Assisatant service.")
return False
except Exception:
logging.exception("Failed to call Home Assistant panel service: %s", service)
return False
def execute_script(entity_name: str, domain: str, service: str, service_data: dict) -> str:
global next_id, response_buffer
@@ -241,13 +271,13 @@ def execute_script(entity_name: str, domain: str, service: str, service_data: di
else:
return response_buffer[call_id]["response"]
raise TimeoutError("Did not recive respose in time to HA script call")
except Exception as e:
logging.exception("Failed to call Home Assisatant script.")
return {}
except Exception:
logging.exception("Failed to call Home Assistant script: %s.%s", domain, service)
return {}
def cache_template(template):
if not template:
raise Exception("Invalid template")
def cache_template(template):
if not template:
raise ValueError("Invalid template")
global next_id, response_buffer
try:
call_id = next_id
@@ -259,9 +289,9 @@ def cache_template(template):
}
send_message(json.dumps(msg))
return True
except Exception as e:
logging.exception("Failed to render template.")
return False
except Exception:
logging.exception("Failed to render template.")
return False
def get_template(template):
global template_cache
@@ -299,7 +329,12 @@ def is_existent(entity_id: str):
return False
def send_message(message):
global ws, next_id
next_id += 1
ws.send(message)
def send_message(message):
global ws, next_id
try:
next_id += 1
ws.send(message)
except NameError:
logging.error("WebSocket client is not initialized; dropping outgoing message")
except Exception:
logging.exception("Failed sending websocket message to Home Assistant")

View File

@@ -15,7 +15,7 @@ import sys
from queue import Queue
from mqtt import MqttManager
logging.getLogger("watchdog").propagate = False
logging.getLogger("watchdog").propagate = False
settings = {}
panels = {}
@@ -25,61 +25,76 @@ last_settings_file_mtime = 0
mqtt_connect_time = 0
has_sent_reload_command = False
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(levelname)s [%(threadName)s] %(name)s: %(message)s",
)
def on_ha_update(entity_id):
global panel_in_queues
# send HA updates to all panels
for queue in panel_in_queues.values():
queue.put(("HA:", entity_id))
def on_ha_update(entity_id):
global panel_in_queues
# send HA updates to all panels
for queue in panel_in_queues.values():
try:
queue.put(("HA:", entity_id))
except Exception:
logging.exception("Failed to enqueue HA update for entity '%s'", entity_id)
def on_ha_panel_event(device_id, msg):
global panel_in_queues
def on_ha_panel_event(device_id, msg):
global panel_in_queues
if device_id in panel_in_queues.keys():
queue = panel_in_queues[device_id]
try:
queue.put(("MQTT:", msg))
except Exception:
logging.exception("Failed to enqueue panel event for device '%s'", device_id)
if device_id in panel_in_queues.keys():
queue = panel_in_queues[device_id]
queue.put(("MQTT:", msg))
def process_output_to_panel():
while True:
msg = panel_out_queue.get()
#client.publish(msg[0], msg[1])
#apis.ha_api.call_service(service="esphome/" + self._api_panel_name + "_nspanelui_api_call", command=2, data=msg)
service = msg[0] + "_nspanelui_api_call"
service_data = {
"data": msg[1],
"command":2
}
libs.home_assistant.send_msg_to_panel(
service = service,
service_data = service_data
)
def process_output_to_panel():
while True:
try:
msg = panel_out_queue.get()
service = msg[0] + "_nspanelui_api_call"
service_data = {
"data": msg[1],
"command": 2
}
libs.home_assistant.send_msg_to_panel(
service=service,
service_data=service_data
)
except Exception:
logging.exception("Failed to process outgoing panel message")
def connect():
global settings, panel_out_queue
if "mqtt_server" in settings and not "use_ha_api" in settings:
MqttManager(settings, panel_out_queue, panel_in_queues)
else:
logging.info("MQTT values not configured, will not connect.")
# MQTT Connected, start APIs if configured
if settings["home_assistant_address"] != "" and settings["home_assistant_token"] != "":
libs.home_assistant.init(settings, on_ha_update)
libs.home_assistant.connect()
else:
logging.info("Home Assistant values not configured, will not connect.")
while not libs.home_assistant.ws_connected:
time.sleep(1)
def connect():
global settings, panel_out_queue
ha_is_configured = settings["home_assistant_address"] != "" and settings["home_assistant_token"] != ""
if "mqtt_server" in settings and not "use_ha_api" in settings:
MqttManager(settings, panel_out_queue, panel_in_queues)
else:
logging.info("MQTT values not configured, will not connect.")
# MQTT Connected, start APIs if configured
if ha_is_configured:
libs.home_assistant.init(settings, on_ha_update)
libs.home_assistant.connect()
else:
logging.info("Home Assistant values not configured, will not connect.")
return
wait_seconds = 0
while not libs.home_assistant.ws_connected:
wait_seconds += 1
if wait_seconds % 10 == 0:
logging.info("Waiting for Home Assistant websocket connection... (%ss)", wait_seconds)
time.sleep(1)
if settings.get("use_ha_api"):
libs.home_assistant.subscribe_to_nspanel_events(on_ha_panel_event)
send_to_panel_thread = threading.Thread(target=process_output_to_panel, args=())
send_to_panel_thread.daemon = True
send_to_panel_thread.start()
def setup_panels():
def setup_panels():
global settings, panel_in_queues
# Create NsPanel object
for name, settings_panel in settings["nspanels"].items():
@@ -92,18 +107,25 @@ def setup_panels():
msg_in_queue = Queue(maxsize=20)
panel_in_queues[settings_panel["panelRecvTopic"]] = msg_in_queue
panel_thread = threading.Thread(target=panel_thread_target, args=(msg_in_queue, name, settings_panel, panel_out_queue))
panel_thread.daemon = True
panel_thread.start()
panel_thread = threading.Thread(target=panel_thread_target, args=(msg_in_queue, name, settings_panel, panel_out_queue))
panel_thread.daemon = True
panel_thread.start()
def panel_thread_target(queue_in, name, settings_panel, queue_out):
panel = LovelaceUIPanel(name, settings_panel, queue_out)
while True:
msg = queue_in.get()
if msg[0] == "MQTT:":
panel.customrecv_event_callback(msg[1])
elif msg[0] == "HA:":
panel.ha_event_callback(msg[1])
def panel_thread_target(queue_in, name, settings_panel, queue_out):
try:
panel = LovelaceUIPanel(name, settings_panel, queue_out)
except Exception:
logging.exception("Failed to initialize panel thread for '%s'", name)
return
while True:
try:
msg = queue_in.get()
if msg[0] == "MQTT:":
panel.customrecv_event_callback(msg[1])
elif msg[0] == "HA:":
panel.ha_event_callback(msg[1])
except Exception:
logging.exception("Panel thread '%s' failed while handling queue message", name)
def get_config_file():
CONFIG_FILE = os.getenv('CONFIG_FILE')
@@ -111,25 +133,34 @@ def get_config_file():
CONFIG_FILE = './panels.yaml'
return CONFIG_FILE
def get_config(file):
def get_config(file):
global settings
try:
with open(file, 'r', encoding="utf8") as file:
settings = yaml.safe_load(file)
except yaml.YAMLError as exc:
print ("Error while parsing YAML file:")
if hasattr(exc, 'problem_mark'):
if exc.context != None:
print (' parser says\n' + str(exc.problem_mark) + '\n ' +
str(exc.problem) + ' ' + str(exc.context) +
'\nPlease correct data and retry.')
else:
print (' parser says\n' + str(exc.problem_mark) + '\n ' +
str(exc.problem) + '\nPlease correct data and retry.')
else:
print ("Something went wrong while parsing yaml file")
return False
except FileNotFoundError:
logging.error("Config file not found: %s", file)
return False
except OSError:
logging.exception("Failed reading config file: %s", file)
return False
except yaml.YAMLError as exc:
logging.error("Error while parsing YAML file: %s", file)
if hasattr(exc, 'problem_mark'):
if exc.context != None:
logging.error(
"Parser says\n%s\n%s %s\nPlease correct data and retry.",
str(exc.problem_mark), str(exc.problem), str(exc.context)
)
else:
logging.error(
"Parser says\n%s\n%s\nPlease correct data and retry.",
str(exc.problem_mark), str(exc.problem)
)
else:
logging.exception("Something went wrong while parsing yaml file")
return False
if not settings.get("mqtt_username"):
settings["mqtt_username"] = os.getenv('MQTT_USER')
@@ -151,7 +182,7 @@ def get_config(file):
settings["is_addon"] = True
return True
def config_watch():
def config_watch():
class ConfigChangeEventHandler(FileSystemEventHandler):
def __init__(self, base_paths):
self.base_paths = base_paths
@@ -162,20 +193,24 @@ def config_watch():
super(ConfigChangeEventHandler, self).dispatch(event)
return
def on_modified(self, event):
logging.info('Modification detected. Reloading panels.')
pid = os.getpid()
os.kill(pid, signal.SIGTERM)
def on_modified(self, event):
logging.info('Modification detected. Reloading panels.')
pid = os.getpid()
os.kill(pid, signal.SIGTERM)
logging.info('Watching for changes in config file')
project_files = []
project_files.append(get_config_file())
handler = ConfigChangeEventHandler(project_files)
observer = Observer()
observer.schedule(handler, path=os.path.dirname(get_config_file()), recursive=True)
observer.start()
while True:
time.sleep(1)
watch_path = os.path.dirname(get_config_file()) or "."
observer.schedule(handler, path=watch_path, recursive=True)
observer.start()
while True:
try:
time.sleep(1)
except Exception:
logging.exception("Config watch loop failed")
def signal_handler(signum, frame):
logging.info(f"Received signal {signum}. Initiating restart...")
@@ -194,4 +229,4 @@ if __name__ == '__main__':
time.sleep(100)
else:
while True:
time.sleep(100)
time.sleep(100)

View File

@@ -19,7 +19,6 @@ class MqttManager:
self.client.username_pw_set(
settings["mqtt_username"], settings["mqtt_password"])
# Wait for connection
connection_return_code = 0
mqtt_server = settings["mqtt_server"]
mqtt_port = int(settings["mqtt_port"])
logging.info("Connecting to %s:%i as %s",
@@ -28,9 +27,12 @@ class MqttManager:
try:
self.client.connect(mqtt_server, mqtt_port, 5)
break # Connection call did not raise exception, connection is sucessfull
except: # pylint: disable=bare-except
except Exception: # pylint: disable=broad-exception-caught
logging.exception(
"Failed to connect to MQTT %s:%i. Will try again in 10 seconds. Code: %s", mqtt_server, mqtt_port, connection_return_code)
"Failed to connect to MQTT %s:%i. Will try again in 10 seconds.",
mqtt_server,
mqtt_port,
)
time.sleep(10.)
self.client.loop_start()
process_thread = threading.Thread(target=self.process_in_queue, args=(self.client, self.msg_in_queue))
@@ -38,31 +40,52 @@ class MqttManager:
process_thread.start()
def on_mqtt_connect(self, client, userdata, flags, rc):
if rc != 0:
logging.error("MQTT connection failed with return code: %s", rc)
return
logging.info("Connected to MQTT Server")
# subscribe to panelRecvTopic of each panel
for settings_panel in self.settings["nspanels"].values():
client.subscribe(settings_panel["panelRecvTopic"])
topic = settings_panel["panelRecvTopic"]
result, _ = client.subscribe(topic)
if result == mqtt.MQTT_ERR_SUCCESS:
logging.debug("Subscribed to panel topic: %s", topic)
else:
logging.error("Failed to subscribe to panel topic '%s' (result=%s)", topic, result)
def on_mqtt_message(self, client, userdata, msg):
try:
if msg.payload.decode() == "":
payload_text = msg.payload.decode('utf-8')
if payload_text == "":
logging.debug("Ignoring empty MQTT payload on topic: %s", msg.topic)
return
if msg.topic in self.msg_out_queue_list.keys():
data = json.loads(msg.payload.decode('utf-8'))
data = json.loads(payload_text)
if "CustomRecv" in data:
queue = self.msg_out_queue_list[msg.topic]
queue.put(("MQTT:", data["CustomRecv"]))
else:
logging.debug("JSON payload on topic '%s' has no 'CustomRecv' key", msg.topic)
else:
logging.debug("Received unhandled message on topic: %s", msg.topic)
except UnicodeDecodeError:
logging.exception("Failed to decode MQTT payload as UTF-8 on topic: %s", msg.topic)
except json.JSONDecodeError:
logging.exception("Failed to parse MQTT JSON payload on topic: %s", msg.topic)
except Exception: # pylint: disable=broad-exception-caught
logging.exception("Something went wrong during processing of message:")
logging.exception("Unexpected error while processing MQTT message on topic: %s", msg.topic)
try:
logging.error(msg.payload.decode('utf-8'))
except: # pylint: disable=bare-except
except Exception: # pylint: disable=broad-exception-caught
logging.error(
"Something went wrong when processing the exception message, couldn't decode payload to utf-8.")
def process_in_queue(self, client, msg_in_queue):
while True:
msg = msg_in_queue.get()
client.publish(msg[0], msg[1])
try:
msg = msg_in_queue.get()
result = client.publish(msg[0], msg[1])
if result.rc != mqtt.MQTT_ERR_SUCCESS:
logging.error("Failed publishing message to topic '%s' (rc=%s)", msg[0], result.rc)
except Exception: # pylint: disable=broad-exception-caught
logging.exception("Failed processing outgoing MQTT queue message")

View File

@@ -103,10 +103,13 @@ class LovelaceUIPanel:
libs.panel_cmd.page_type(self.msg_out_queue, self.sendTopic, "pageStartup")
def schedule_thread_target(self):
while True:
self.schedule.exec_jobs()
time.sleep(1)
def schedule_thread_target(self):
while True:
try:
self.schedule.exec_jobs()
except Exception:
logging.exception("Scheduler execution failed for panel '%s'", self.name)
time.sleep(1)
def update_time(self):
use_timezone = tz.gettz(self.settings["timeZone"])
@@ -200,12 +203,15 @@ class LovelaceUIPanel:
return card
return list(self.cards.values())[0]
def customrecv_event_callback(self, msg):
logging.debug("Recv Message from NsPanel (%s): %s", self.name, msg)
msg = msg.split(",")
# run action based on received command
if msg[0] == "event":
if msg[1] == "startup":
def customrecv_event_callback(self, msg):
logging.debug("Recv Message from NsPanel (%s): %s", self.name, msg)
msg = msg.split(",")
if len(msg) < 2:
logging.error("Malformed panel message on '%s': %s", self.name, msg)
return
# run action based on received command
if msg[0] == "event":
if msg[1] == "startup":
# TODO: Handle Update Messages
self.update_date()
self.update_time()
@@ -226,15 +232,19 @@ class LovelaceUIPanel:
self.render_current_page(switchPages=True)
if msg[1] == "renderCurrentPage":
self.render_current_page(requested=True)
if msg[1] == "buttonPress2":
entity_id = msg[2]
if entity_id == "":
return
btype = msg[3]
value = msg[4] if len(msg) > 4 else None
if btype == "bExit":
if entity_id in ["screensaver", "screensaver2"] and self.settings.get("screensaver").get("doubleTapToUnlock") and value == "1":
return
if msg[1] == "buttonPress2":
if len(msg) < 4:
logging.error("Malformed buttonPress2 payload on '%s': %s", self.name, msg)
return
entity_id = msg[2]
if entity_id == "":
return
btype = msg[3]
value = msg[4] if len(msg) > 4 else None
entity_config = {}
if btype == "bExit":
if entity_id in ["screensaver", "screensaver2"] and self.settings.get("screensaver").get("doubleTapToUnlock") and value == "1":
return
# in case privious_cards is empty add a default card
if len(self.privious_cards) == 0:
@@ -249,12 +259,12 @@ class LovelaceUIPanel:
return
# replace iid with real entity id
if entity_id.startswith("iid."):
iid = entity_id.split(".")[1]
for e in self.current_card.entities:
if e.iid == iid:
entity_id = e.entity_id
entity_config = e.config
if entity_id.startswith("iid."):
iid = entity_id.split(".")[1]
for e in self.current_card.entities:
if e.iid == iid:
entity_id = e.entity_id
entity_config = e.config
match btype:
case 'button':
@@ -285,17 +295,20 @@ class LovelaceUIPanel:
case _:
ha_control.handle_buttons(entity_id, btype, value)
if msg[1] == "pageOpenDetail":
entity_id = msg[3]
# replace iid with real entity id
if entity_id.startswith("iid."):
iid = entity_id.split(".")[1]
for e in self.current_card.entities:
if e.iid == iid:
entity_id = e.entity_id
effectList = None
if entity_id.startswith("light"):
effectList = e.config.get("effectList")
if msg[1] == "pageOpenDetail":
if len(msg) < 4:
logging.error("Malformed pageOpenDetail payload on '%s': %s", self.name, msg)
return
entity_id = msg[3]
effectList = None
# replace iid with real entity id
if entity_id.startswith("iid."):
iid = entity_id.split(".")[1]
for e in self.current_card.entities:
if e.iid == iid:
entity_id = e.entity_id
if entity_id.startswith("light"):
effectList = e.config.get("effectList")
if msg[2] == "popupInSel": #entity_id.split(".")[0] in ['input_select', 'media_player']:
libs.panel_cmd.entityUpdateDetail2(self.msg_out_queue, self.sendTopic, detail_open(self.settings["locale"], msg[2], entity_id, msg[3], self.msg_out_queue, sendTopic=self.sendTopic, options_list=effectList))
else: