mirror of
https://github.com/joBr99/nspanel-lovelace-ui.git
synced 2025-12-20 22:47:01 +01:00
.
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
# https://developers.home-assistant.io/docs/add-ons/configuration#add-on-config
|
||||
name: NSPanel Lovelace UI Addon
|
||||
version: "4.7.17"
|
||||
version: "4.7.18"
|
||||
slug: nspanel-lovelace-ui
|
||||
description: NSPanel Lovelace UI Addon
|
||||
services:
|
||||
|
||||
@@ -4,6 +4,10 @@ import ha_colors
|
||||
from libs.localization import get_translation
|
||||
import panel_cards
|
||||
import logging
|
||||
import dateutil.parser as dp
|
||||
import babel
|
||||
from libs.icon_mapping import get_icon_char
|
||||
from libs.helper import rgb_dec565
|
||||
|
||||
class HAEntity(panel_cards.Entity):
|
||||
def __init__(self, locale, config, panel):
|
||||
@@ -15,7 +19,7 @@ class HAEntity(panel_cards.Entity):
|
||||
if data:
|
||||
self.state = data.get("state")
|
||||
self.attributes = data.get("attributes", [])
|
||||
print(data)
|
||||
#print(data)
|
||||
|
||||
# HA Entities
|
||||
entity_type_panel = "text"
|
||||
@@ -145,38 +149,52 @@ class HAEntity(panel_cards.Entity):
|
||||
value = value + unit_of_measurement
|
||||
if cardType in ["cardGrid", "cardGrid2"] and not self.icon_overwrite:
|
||||
icon_char = value
|
||||
|
||||
case 'binary_sensor':
|
||||
device_class = self.attributes.get("device_class", "")
|
||||
value = get_translation(self.locale, f"backend.component.binary_sensor.state.{device_class}.{entity.state}")
|
||||
case 'weather':
|
||||
attr = self.config.get("attribute", "temperature")
|
||||
value = str(self.attributes.get(attr, self.state))
|
||||
|
||||
# settings for forecast
|
||||
forecast_type = None
|
||||
if self.config.get("day"):
|
||||
forecast_type = "daily"
|
||||
pos = self.config.get("day")
|
||||
datetime_format = "E"
|
||||
if self.config.get("hour"):
|
||||
forecast_type = "hourly"
|
||||
pos = self.config.get("hour")
|
||||
datetime_format = "H:mm"
|
||||
if forecast_type:
|
||||
forecast = libs.home_assistant.execute_script(
|
||||
entity_name=self.entity_id,
|
||||
domain='weather',
|
||||
service="get_forecast",
|
||||
service_data={
|
||||
'type': forecast_type
|
||||
}
|
||||
).get("forecast", [])
|
||||
if len(forecast) > pos:
|
||||
forcast_pos = forecast[pos]
|
||||
forcast_condition = forcast_pos.get("condition", "")
|
||||
forcast_date = dp.parse(forcast_pos.get("datetime")).astimezone()
|
||||
|
||||
icon_char = ha_icons.get_icon_ha(self.etype, forcast_condition)
|
||||
name = babel.dates.format_datetime(forcast_date, datetime_format, locale=self.locale)
|
||||
|
||||
attr = self.config.get("attribute", "temperature")
|
||||
value = str(forcast_pos.get(attr, "not found"))
|
||||
else:
|
||||
name: "unknown"
|
||||
# add units
|
||||
if attr in ["temperature", "apparent_temperature", "templow"]:
|
||||
value += self.config.get("unit", "°C")
|
||||
else:
|
||||
value += self.config.get("unit", "")
|
||||
case _:
|
||||
name = "unsupported"
|
||||
|
||||
# elif entityType == "weather":
|
||||
# entityTypePanel = "text"
|
||||
# unit = get_attr_safe(entity, "temperature_unit", "")
|
||||
# if type(item.stype) == int and len(entity.attributes.forecast) >= item.stype:
|
||||
# fdate = dp.parse(
|
||||
# entity.attributes.forecast[item.stype]['datetime'])
|
||||
# global babel_spec
|
||||
# if babel_spec is not None:
|
||||
# dateformat = "E" if item.nameOverride is None else item.nameOverride
|
||||
# name = babel.dates.format_datetime(
|
||||
# fdate.astimezone(), dateformat, locale=self._locale)
|
||||
# else:
|
||||
# dateformat = "%a" if item.nameOverride is None else item.nameOverride
|
||||
# name = fdate.astimezone().strftime(dateformat)
|
||||
# icon_id = get_icon_ha(
|
||||
# entityId, stateOverwrite=entity.attributes.forecast[item.stype]['condition'])
|
||||
# value = f'{entity.attributes.forecast[item.stype].get("temperature", "")}{unit}'
|
||||
# color = self.get_entity_color(
|
||||
# entity, ha_type=entityType, stateOverwrite=entity.attributes.forecast[item.stype]['condition'], overwrite=colorOverride)
|
||||
# else:
|
||||
# value = f'{get_attr_safe(entity, "temperature", "")}{unit}'
|
||||
# else:
|
||||
#
|
||||
|
||||
return f"~{entity_type_panel}~iid.{self.iid}~{icon_char}~{color}~{name}~{value}"
|
||||
|
||||
class HACard(panel_cards.Card):
|
||||
@@ -184,6 +202,9 @@ class HACard(panel_cards.Card):
|
||||
super().__init__(locale, config, panel)
|
||||
# Generate Entity for each Entity in Config
|
||||
self.entities = []
|
||||
if "entity" in config:
|
||||
iid, entity = entity_factory(locale, config, panel)
|
||||
self.entities.append(entity)
|
||||
if "entities" in config:
|
||||
for e in config.get("entities"):
|
||||
iid, entity = entity_factory(locale, e, panel)
|
||||
@@ -257,10 +278,144 @@ class PowerCard(HACard):
|
||||
result += f"~{speed}"
|
||||
return result
|
||||
|
||||
class Screensaver(HACard):
|
||||
class MediaCard(HACard):
|
||||
def __init__(self, locale, config, panel):
|
||||
super().__init__(locale, config, panel)
|
||||
|
||||
def render(self):
|
||||
main_entity = self.entities[0]
|
||||
media_icon = main_entity.render()
|
||||
if not self.title:
|
||||
self.title = main_entity.attributes.get("friendly_name", "unknown")
|
||||
title = main_entity.attributes.get("media_title", "")
|
||||
author = main_entity.attributes.get("media_artist", "")
|
||||
volume = int(main_entity.attributes.get("media_artist", 0)*100)
|
||||
iconplaypause = get_icon_char("pause") if main_entity.state == "playing" else get_icon_char("play")
|
||||
onoffbutton = "disable"
|
||||
shuffleBtn = "disable"
|
||||
|
||||
bits = main_entity.attributes.get("supported_features")
|
||||
if bits & 0b10000000:
|
||||
if main_entity.state == "off":
|
||||
onoffbutton = 1374
|
||||
else:
|
||||
onoffbutton = rgb_dec565([255,152,0])
|
||||
|
||||
if bits & 0b100000000000000:
|
||||
shuffle = main_entity.attributes.get("shuffle", "")
|
||||
if shuffle == False:
|
||||
shuffleBtn = get_icon_char('shuffle-disabled')
|
||||
elif shuffle == True:
|
||||
shuffleBtn = get_icon_char('shuffle')
|
||||
|
||||
# generate button entities
|
||||
button_str = ""
|
||||
for e in self.entities[1:]:
|
||||
button_str += e.render()
|
||||
result = f"{self.title}~{self.gen_nav()}~{main_entity.entity_id}~{title}~~{author}~~{volume}~{iconplaypause}~{onoffbutton}~{shuffleBtn}{media_icon}{button_str}"
|
||||
return result
|
||||
|
||||
class ClimateCard(HACard):
|
||||
def __init__(self, locale, config, panel):
|
||||
super().__init__(locale, config, panel)
|
||||
|
||||
def render(self):
|
||||
main_entity = self.entities[0]
|
||||
|
||||
#TODO: temp unit
|
||||
temp_unit = "celsius"
|
||||
if(temp_unit == "celsius"):
|
||||
temperature_unit_icon = get_icon_char("temperature-celsius")
|
||||
temperature_unit = "°C"
|
||||
|
||||
else:
|
||||
temperature_unit_icon = get_icon_char("temperature-fahrenheit")
|
||||
temperature_unit = "°F"
|
||||
|
||||
main_entity.render()
|
||||
if not self.title:
|
||||
self.title = main_entity.attributes.get("friendly_name", "unknown")
|
||||
current_temp = main_entity.attributes.get("current_temperature", "")
|
||||
dest_temp = main_entity.attributes.get("temperature", None)
|
||||
dest_temp2 = ""
|
||||
if dest_temp is None:
|
||||
dest_temp = main_entity.attributes.get("target_temp_high", 0)
|
||||
dest_temp2 = main_entity.attributes.get("target_temp_low", None)
|
||||
if dest_temp2 != None and dest_temp2 != "null":
|
||||
dest_temp2 = int(dest_temp2*10)
|
||||
else:
|
||||
dest_temp2 = ""
|
||||
dest_temp = int(dest_temp*10)
|
||||
|
||||
hvac_action = main_entity.attributes.get("hvac_action", "")
|
||||
state_value = ""
|
||||
if hvac_action != "":
|
||||
state_value = get_translation(self.locale, f"frontend.state_attributes.climate.hvac_action.{hvac_action}")
|
||||
state_value += "\r\n("
|
||||
state_value += get_translation(self.locale, f"backend.component.climate.state._.{main_entity.state}")
|
||||
if hvac_action != "":
|
||||
state_value += ")"
|
||||
|
||||
min_temp = int(main_entity.attributes.get("min_temp", 0)*10)
|
||||
max_temp = int(main_entity.attributes.get("max_temp", 0)*10)
|
||||
step_temp = int(main_entity.attributes.get("target_temp_step", 0.5)*10)
|
||||
icon_res_list = []
|
||||
icon_res = ""
|
||||
|
||||
hvac_modes = main_entity.attributes.get("hvac_modes", [])
|
||||
if main_entity.config.get("supported_modes"):
|
||||
hvac_modes = main_entity.config.get("supported_modes")
|
||||
for mode in hvac_modes:
|
||||
icon_id = ha_icons.get_icon_ha("climate", mode)
|
||||
color_on = 64512
|
||||
if mode in ["auto", "heat_cool"]:
|
||||
color_on = 1024
|
||||
if mode == "heat":
|
||||
color_on = 64512
|
||||
if mode == "off":
|
||||
color_on = 35921
|
||||
if mode == "cool":
|
||||
color_on = 11487
|
||||
if mode == "dry":
|
||||
color_on = 60897
|
||||
if mode == "fan_only":
|
||||
color_on = 35921
|
||||
state = 0
|
||||
if(mode == main_entity.state):
|
||||
state = 1
|
||||
|
||||
icon_res_list.append(f"~{icon_id}~{color_on}~{state}~{mode}")
|
||||
|
||||
icon_res = "".join(icon_res_list)
|
||||
|
||||
if len(icon_res_list) == 1 and not self.panel.model == "us-p":
|
||||
icon_res = "~"*4 + icon_res_list[0] + "~"*4*6
|
||||
elif len(icon_res_list) == 2 and not self.panel.model == "us-p":
|
||||
icon_res = "~"*4*2 + icon_res_list[0] + "~"*4*2 + icon_res_list[1] + "~"*4*2
|
||||
elif len(icon_res_list) == 3 and not self.panel.model == "us-p":
|
||||
icon_res = "~"*4*2 + icon_res_list[0] + "~"*4 + icon_res_list[1] + "~"*4 + icon_res_list[2] + "~"*4
|
||||
elif len(icon_res_list) == 4 and not self.panel.model == "us-p":
|
||||
icon_res = "~"*4 + icon_res_list[0] + "~"*4 + icon_res_list[1] + "~"*4 + icon_res_list[2] + "~"*4 + icon_res_list[3]
|
||||
elif len(icon_res_list) >= 5 or self.panel.model == "us-p":
|
||||
icon_res = "".join(icon_res_list) + "~"*4*(8-len(icon_res_list))
|
||||
|
||||
currently_translation = get_translation(self.locale, "frontend.ui.card.climate.currently")
|
||||
state_translation = get_translation(self.locale, "frontend.ui.panel.config.devices.entities.state")
|
||||
action_translation = get_translation(self.locale, "frontend.ui.card.climate.operation").replace(' ','\r\n')
|
||||
|
||||
detailPage = "1"
|
||||
if any(x in ["preset_modes", "swing_modes", "fan_modes"] for x in main_entity.attributes):
|
||||
detailPage = "0"
|
||||
|
||||
result = f"{self.title}~{self.gen_nav()}~{main_entity.entity_id}~{current_temp} {temperature_unit}~{dest_temp}~{state_value}~{min_temp}~{max_temp}~{step_temp}{icon_res}~{currently_translation}~{state_translation}~{action_translation}~{temperature_unit_icon}~{dest_temp2}~{detailPage}"
|
||||
return result
|
||||
|
||||
|
||||
class Screensaver(HACard):
|
||||
def __init__(self, locale, config, panel):
|
||||
super().__init__(locale, config, panel)
|
||||
if not self.type:
|
||||
self.type = "screensaver"
|
||||
def render(self):
|
||||
result = ""
|
||||
for e in self.entities:
|
||||
@@ -276,6 +431,10 @@ def card_factory(locale, settings, panel):
|
||||
card = QRCard(locale, settings, panel)
|
||||
case 'cardPower':
|
||||
card = PowerCard(locale, settings, panel)
|
||||
case 'cardMedia':
|
||||
card = MediaCard(locale, settings, panel)
|
||||
case 'cardThermo':
|
||||
card = ClimateCard(locale, settings, panel)
|
||||
case _:
|
||||
logging.error("card type %s not implemented", settings["type"])
|
||||
return "NotImplemented", None
|
||||
|
||||
@@ -1,5 +1,12 @@
|
||||
import libs.home_assistant
|
||||
import logging
|
||||
import time
|
||||
|
||||
def wait_for_ha_cache():
|
||||
mustend = time.time() + 5
|
||||
while time.time() < mustend:
|
||||
if len(libs.home_assistant.home_assistant_entity_state_cache) == 0:
|
||||
time.sleep(0.1)
|
||||
|
||||
def handle_buttons(entity_id, btype, value):
|
||||
match btype:
|
||||
@@ -9,12 +16,8 @@ def handle_buttons(entity_id, btype, value):
|
||||
on_off(entity_id, value)
|
||||
case 'number-set':
|
||||
if entity_id.startswith('fan'):
|
||||
logging.error("nuber-set fan not implemented")
|
||||
# entity = apis.ha_api.get_entity(entity_id)
|
||||
# value = float(value) * \
|
||||
# float(entity.attributes.get("percentage_step", 0))
|
||||
# entity.call_service("set_percentage", percentage=value)
|
||||
else:
|
||||
attr = libs.home_assistant.get_entity_data(entity_id).get('attributes', [])
|
||||
value = float(value) * float(attr.get(percentage_step, 0))
|
||||
service_data = {
|
||||
"value": int(value)
|
||||
}
|
||||
@@ -48,68 +51,16 @@ def handle_buttons(entity_id, btype, value):
|
||||
"tilt_position": int(value)
|
||||
}
|
||||
call_ha_service(entity_id, "set_cover_tilt_position", service_data=service_data)
|
||||
case 'media-OnOff':
|
||||
state = libs.home_assistant.get_entity_data(entity_id).get('state', '')
|
||||
if state == "off":
|
||||
call_ha_service(entity_id, "turn_on")
|
||||
else:
|
||||
call_ha_service(entity_id, "turn_off")
|
||||
|
||||
case _:
|
||||
logging.error("Not implemented: %s", btype)
|
||||
|
||||
def call_ha_service(entity_id, service, service_data = {}):
|
||||
etype = entity_id.split(".")[0]
|
||||
libs.home_assistant.call_service(
|
||||
entity_name=entity_id,
|
||||
domain=etype,
|
||||
service=service,
|
||||
service_data=service_data
|
||||
)
|
||||
|
||||
def button_press(entity_id, value):
|
||||
etype = entity_id.split(".")[0]
|
||||
match etype:
|
||||
case 'scene' | 'script':
|
||||
call_ha_service(entity_id, "turn_on")
|
||||
case 'light' | 'switch' | 'input_boolean' | 'automation' | 'fan':
|
||||
call_ha_service(entity_id, "toggle")
|
||||
#case 'lock':
|
||||
# elif entity_id.startswith('lock'):
|
||||
# if apis.ha_api.get_entity(entity_id).state == "locked":
|
||||
# apis.ha_api.get_entity(entity_id).call_service("unlock")
|
||||
# else:
|
||||
# apis.ha_api.get_entity(entity_id).call_service("lock")
|
||||
case 'button' | 'input_button':
|
||||
call_ha_service(entity_id, "press")
|
||||
case 'input_select' | 'select':
|
||||
call_ha_service(entity_id, "select_next")
|
||||
#case 'vacuum':
|
||||
# elif entity_id.startswith('vacuum'):
|
||||
# if apis.ha_api.get_entity(entity_id).state == "docked":
|
||||
# apis.ha_api.get_entity(entity_id).call_service("start")
|
||||
# else:
|
||||
# apis.ha_api.get_entity(
|
||||
# entity_id).call_service("return_to_base")
|
||||
case _:
|
||||
logging.error("buttonpress for entity type %s not implemented", etype)
|
||||
|
||||
# elif entity_id.startswith('service'):
|
||||
# apis.ha_api.call_service(entity_id.replace(
|
||||
# 'service.', '', 1).replace('.', '/', 1), **entity_config.data)
|
||||
|
||||
def on_off(entity_id, value):
|
||||
etype = entity_id.split(".")[0]
|
||||
match etype:
|
||||
case 'light' | 'switch' | 'input_boolean' | 'automation' | 'fan':
|
||||
service = "turn_off"
|
||||
if value == "1":
|
||||
service = "turn_on"
|
||||
libs.home_assistant.call_service(
|
||||
entity_name=entity_id,
|
||||
domain=etype,
|
||||
service=service,
|
||||
service_data={}
|
||||
)
|
||||
case _:
|
||||
logging.error(
|
||||
"Control action on_off not implemented for %s", entity_id)
|
||||
|
||||
|
||||
# if button_type == "media-OnOff":
|
||||
# if apis.ha_api.get_entity(entity_id).state == "off":
|
||||
# apis.ha_api.get_entity(entity_id).call_service("turn_on")
|
||||
@@ -238,3 +189,58 @@ def on_off(entity_id, value):
|
||||
# else:
|
||||
# apis.ha_api.get_entity(entity_id).call_service("start")
|
||||
|
||||
def call_ha_service(entity_id, service, service_data = {}):
|
||||
etype = entity_id.split(".")[0]
|
||||
libs.home_assistant.call_service(
|
||||
entity_name=entity_id,
|
||||
domain=etype,
|
||||
service=service,
|
||||
service_data=service_data
|
||||
)
|
||||
|
||||
def button_press(entity_id, value):
|
||||
etype = entity_id.split(".")[0]
|
||||
match etype:
|
||||
case 'scene' | 'script':
|
||||
call_ha_service(entity_id, "turn_on")
|
||||
case 'light' | 'switch' | 'input_boolean' | 'automation' | 'fan':
|
||||
call_ha_service(entity_id, "toggle")
|
||||
case 'lock':
|
||||
state = libs.home_assistant.get_entity_data(entity_id).get('state', '')
|
||||
if state == "locked":
|
||||
call_ha_service(entity_id, "unlock")
|
||||
else:
|
||||
call_ha_service(entity_id, "lock")
|
||||
case 'button' | 'input_button':
|
||||
call_ha_service(entity_id, "press")
|
||||
case 'input_select' | 'select':
|
||||
call_ha_service(entity_id, "select_next")
|
||||
case 'vacuum':
|
||||
state = libs.home_assistant.get_entity_data(entity_id).get('state', '')
|
||||
if state == "docked":
|
||||
call_ha_service(entity_id, "start")
|
||||
else:
|
||||
call_ha_service(entity_id, "return_to_base")
|
||||
case _:
|
||||
logging.error("buttonpress for entity type %s not implemented", etype)
|
||||
|
||||
# elif entity_id.startswith('service'):
|
||||
# apis.ha_api.call_service(entity_id.replace(
|
||||
# 'service.', '', 1).replace('.', '/', 1), **entity_config.data)
|
||||
|
||||
def on_off(entity_id, value):
|
||||
etype = entity_id.split(".")[0]
|
||||
match etype:
|
||||
case 'light' | 'switch' | 'input_boolean' | 'automation' | 'fan':
|
||||
service = "turn_off"
|
||||
if value == "1":
|
||||
service = "turn_on"
|
||||
libs.home_assistant.call_service(
|
||||
entity_name=entity_id,
|
||||
domain=etype,
|
||||
service=service,
|
||||
service_data={}
|
||||
)
|
||||
case _:
|
||||
logging.error(
|
||||
"Control action on_off not implemented for %s", entity_id)
|
||||
@@ -14,6 +14,7 @@ next_id = 0
|
||||
request_all_states_id = 0
|
||||
ws_connected = False
|
||||
home_assistant_entity_state_cache = {}
|
||||
response_buffer = {}
|
||||
|
||||
ON_CONNECT_HANDLER = None
|
||||
ON_DISCONNECT_HANDLER = None
|
||||
@@ -42,7 +43,7 @@ def register_on_disconnect_handler(handler):
|
||||
|
||||
|
||||
def on_message(ws, message):
|
||||
global auth_ok, request_all_states_id, home_assistant_entity_state_cache
|
||||
global auth_ok, request_all_states_id, home_assistant_entity_state_cache, response_buffer
|
||||
json_msg = json.loads(message)
|
||||
if json_msg["type"] == "auth_required":
|
||||
authenticate_client()
|
||||
@@ -63,9 +64,11 @@ def on_message(ws, message):
|
||||
elif json_msg["type"] == "result" and json_msg["success"]:
|
||||
if json_msg["id"] == request_all_states_id:
|
||||
for entity in json_msg["result"]:
|
||||
# logging.debug(f"test {entity['entity_id']}")
|
||||
home_assistant_entity_state_cache[entity["entity_id"]] = entity
|
||||
# logging.debug(f"request_all_states_id {json_msg['result']}")
|
||||
else:
|
||||
if json_msg["id"] in response_buffer:
|
||||
response_buffer[json_msg["id"]] = json_msg["result"]
|
||||
|
||||
return None # Ignore success result messages
|
||||
else:
|
||||
logging.debug(message)
|
||||
@@ -138,108 +141,11 @@ def _get_all_states():
|
||||
request_all_states_id = next_id
|
||||
send_message(json.dumps(msg))
|
||||
|
||||
# Got new value from Home Assistant, publish to MQTT
|
||||
|
||||
|
||||
# Got new value from Home Assistant, send update to callback method
|
||||
def send_entity_update(entity_id):
|
||||
global on_ha_update
|
||||
on_ha_update(entity_id)
|
||||
|
||||
def set_entity_brightness(home_assistant_name: str, light_level: int, color_temp: int) -> bool:
|
||||
"""Set entity brightness"""
|
||||
global next_id
|
||||
try:
|
||||
# Format Home Assistant state update
|
||||
msg = None
|
||||
if home_assistant_name.startswith("light."):
|
||||
msg = {
|
||||
"id": next_id,
|
||||
"type": "call_service",
|
||||
"domain": "light",
|
||||
"service": "turn_on",
|
||||
"service_data": {
|
||||
"brightness_pct": light_level,
|
||||
},
|
||||
"target": {
|
||||
"entity_id": home_assistant_name
|
||||
}
|
||||
}
|
||||
if color_temp > 0:
|
||||
msg["service_data"]["kelvin"] = color_temp
|
||||
elif home_assistant_name.startswith("switch."):
|
||||
msg = {
|
||||
"id": next_id,
|
||||
"type": "call_service",
|
||||
"domain": "switch",
|
||||
"service": "turn_on" if light_level > 0 else "turn_off",
|
||||
"target": {
|
||||
"entity_id": home_assistant_name
|
||||
}
|
||||
}
|
||||
|
||||
if msg:
|
||||
send_message(json.dumps(msg))
|
||||
return True
|
||||
else:
|
||||
logging.error(F"{home_assistant_name} is now a recognized domain.")
|
||||
return False
|
||||
except:
|
||||
logging.exception("Failed to send entity update to Home Assisatant.")
|
||||
return False
|
||||
|
||||
|
||||
def set_entity_color_temp(entity_name: str, color_temp: int) -> bool:
|
||||
"""Set entity brightness"""
|
||||
global next_id
|
||||
try:
|
||||
# Format Home Assistant state update
|
||||
msg = {
|
||||
"id": next_id,
|
||||
"type": "call_service",
|
||||
"domain": "light",
|
||||
"service": "turn_on",
|
||||
"service_data": {
|
||||
"kelvin": color_temp
|
||||
},
|
||||
"target": {
|
||||
"entity_id": entity_name
|
||||
}
|
||||
}
|
||||
send_message(json.dumps(msg))
|
||||
return True
|
||||
except:
|
||||
logging.exception("Failed to send entity update to Home Assisatant.")
|
||||
return False
|
||||
|
||||
|
||||
def set_entity_color_saturation(entity_name: str, light_level: int, color_saturation: int, color_hue: int) -> bool:
|
||||
"""Set entity brightness"""
|
||||
global next_id
|
||||
try:
|
||||
# Format Home Assistant state update
|
||||
msg = {
|
||||
"id": next_id,
|
||||
"type": "call_service",
|
||||
"domain": "light",
|
||||
"service": "turn_on",
|
||||
"service_data": {
|
||||
"hs_color": [
|
||||
color_hue,
|
||||
color_saturation
|
||||
],
|
||||
"brightness_pct": light_level
|
||||
},
|
||||
"target": {
|
||||
"entity_id": entity_name
|
||||
}
|
||||
}
|
||||
send_message(json.dumps(msg))
|
||||
return True
|
||||
except Exception as e:
|
||||
logging.exception("Failed to send entity update to Home Assisatant.")
|
||||
return False
|
||||
|
||||
|
||||
def call_service(entity_name: str, domain: str, service: str, service_data: dict) -> bool:
|
||||
global next_id
|
||||
try:
|
||||
@@ -251,14 +157,51 @@ def call_service(entity_name: str, domain: str, service: str, service_data: dict
|
||||
"service_data": service_data,
|
||||
"target": {
|
||||
"entity_id": entity_name
|
||||
}
|
||||
},
|
||||
}
|
||||
send_message(json.dumps(msg))
|
||||
return True
|
||||
except Exception as e:
|
||||
logging.exception("Failed to send entity update to Home Assisatant.")
|
||||
logging.exception("Failed to call Home Assisatant service.")
|
||||
return False
|
||||
|
||||
def execute_script(entity_name: str, domain: str, service: str, service_data: dict) -> str:
|
||||
global next_id, response_buffer
|
||||
try:
|
||||
call_id = next_id
|
||||
# request answer for this call
|
||||
response_buffer[call_id] = True
|
||||
msg = {
|
||||
"id": call_id,
|
||||
"type": "execute_script",
|
||||
"sequence": [
|
||||
{
|
||||
"service": f"{domain}.{service}",
|
||||
"data": service_data,
|
||||
"target": {
|
||||
"entity_id": [entity_name]
|
||||
},
|
||||
"response_variable": "service_result"
|
||||
},
|
||||
{
|
||||
"stop": "done",
|
||||
"response_variable": "service_result"
|
||||
}
|
||||
]
|
||||
}
|
||||
send_message(json.dumps(msg))
|
||||
# busy waiting for response with a timeout of 0.2 seconds - maybe there's a better way of doing this
|
||||
mustend = time.time() + 0.2
|
||||
while time.time() < mustend:
|
||||
if response_buffer[call_id] == True:
|
||||
#print(f'loooooooooop {time.time()}')
|
||||
time.sleep(0.0001)
|
||||
else:
|
||||
return response_buffer[call_id]["response"]
|
||||
raise TimeoutError("Did not recive respose in time to HA script call")
|
||||
except Exception as e:
|
||||
logging.exception("Failed to call Home Assisatant script.")
|
||||
return False
|
||||
|
||||
def get_entity_data(entity_id: str):
|
||||
if entity_id in home_assistant_entity_state_cache:
|
||||
|
||||
@@ -66,6 +66,11 @@ def on_message(client, userdata, msg):
|
||||
logging.error(
|
||||
"Something went wrong when processing the exception message, couldn't decode payload to utf-8.")
|
||||
|
||||
def get_config_file():
|
||||
CONFIG_FILE = os.getenv('CONFIG_FILE')
|
||||
if not CONFIG_FILE:
|
||||
CONFIG_FILE = './config.yml'
|
||||
return CONFIG_FILE
|
||||
|
||||
def get_config(file):
|
||||
global settings
|
||||
@@ -92,9 +97,9 @@ def get_config(file):
|
||||
settings["home_assistant_address"] = "http://supervisor"
|
||||
settings["is_addon"] = True
|
||||
|
||||
print(settings["home_assistant_token"])
|
||||
print(settings["home_assistant_address"])
|
||||
print(settings["is_addon"])
|
||||
#print(settings["home_assistant_token"])
|
||||
#print(settings["home_assistant_address"])
|
||||
#print(settings["is_addon"])
|
||||
|
||||
def connect():
|
||||
global settings, home_assistant, client
|
||||
@@ -164,24 +169,21 @@ def config_watch():
|
||||
|
||||
logging.info('Watching for changes in config file')
|
||||
project_files = []
|
||||
project_files.append("/share/config.yml")
|
||||
project_files.append(get_config_file())
|
||||
handler = ConfigChangeEventHandler(project_files)
|
||||
observer = Observer()
|
||||
observer.schedule(handler, path='/share', recursive=True)
|
||||
observer.schedule(handler, path=os.path.dirname(get_config_file()), recursive=True)
|
||||
observer.start()
|
||||
while True:
|
||||
time.sleep(1)
|
||||
|
||||
def signal_handler(signum, frame):
|
||||
print(f"Received signal {signum}. Initiating restart...")
|
||||
logging.info(f"Received signal {signum}. Initiating restart...")
|
||||
python = sys.executable
|
||||
os.execl(python, python, *sys.argv)
|
||||
|
||||
if __name__ == '__main__':
|
||||
CONFIG_FILE = os.getenv('CONFIG_FILE')
|
||||
if not CONFIG_FILE:
|
||||
CONFIG_FILE = 'config.yml'
|
||||
get_config(CONFIG_FILE)
|
||||
get_config(get_config_file())
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
threading.Thread(target=config_watch).start()
|
||||
connect()
|
||||
|
||||
@@ -10,7 +10,6 @@ import babel.dates
|
||||
from ha_cards import Screensaver, EntitiesCard, card_factory
|
||||
import ha_control
|
||||
|
||||
|
||||
class LovelaceUIPanel:
|
||||
|
||||
def __init__(self, mqtt_client_from_manager, name_panel, settings_panel):
|
||||
@@ -87,7 +86,6 @@ class LovelaceUIPanel:
|
||||
libs.panel_cmd.send_date(self.sendTopic, date_string)
|
||||
|
||||
def searchCard(self, iid):
|
||||
print(f"searchcard {iid} {self.navigate_keys}")
|
||||
if iid in self.navigate_keys:
|
||||
iid = self.navigate_keys[iid]
|
||||
if iid in self.cards:
|
||||
@@ -110,18 +108,17 @@ class LovelaceUIPanel:
|
||||
self.update_date()
|
||||
self.update_time()
|
||||
|
||||
libs.panel_cmd.page_type(self.sendTopic, "screensaver")
|
||||
self.current_card = Screensaver(
|
||||
self.settings["locale"], self.settings["screensaver"], self)
|
||||
libs.panel_cmd.weatherUpdate(
|
||||
self.sendTopic, self.current_card.render())
|
||||
# check if ha state cache is already populated
|
||||
ha_control.wait_for_ha_cache()
|
||||
|
||||
self.current_card = Screensaver(self.settings["locale"], self.settings["screensaver"], self)
|
||||
libs.panel_cmd.page_type(self.sendTopic, self.current_card.type)
|
||||
libs.panel_cmd.weatherUpdate(self.sendTopic, self.current_card.render())
|
||||
if msg[1] == "sleepReached":
|
||||
self.privious_cards.append(self.current_card)
|
||||
libs.panel_cmd.page_type(self.sendTopic, "screensaver")
|
||||
self.current_card = Screensaver(
|
||||
self.settings["locale"], self.settings["screensaver"], self)
|
||||
libs.panel_cmd.weatherUpdate(
|
||||
self.sendTopic, self.current_card.render())
|
||||
self.current_card = Screensaver(self.settings["locale"], self.settings["screensaver"], self)
|
||||
libs.panel_cmd.page_type(self.sendTopic, self.current_card.type)
|
||||
libs.panel_cmd.weatherUpdate(self.sendTopic, self.current_card.render())
|
||||
if msg[1] == "buttonPress2":
|
||||
entity_id = msg[2]
|
||||
btype = msg[3]
|
||||
|
||||
@@ -24,6 +24,7 @@ class Entity:
|
||||
self.locale = locale
|
||||
self.entity_id = config["entity"]
|
||||
self.etype = self.entity_id.split(".")[0]
|
||||
self.config = config
|
||||
self.panel = panel
|
||||
self.icon_overwrite = config.get("icon", None)
|
||||
self.name_overwrite = config.get("name", None)
|
||||
|
||||
Reference in New Issue
Block a user