redesign of config format

This commit is contained in:
Johannes Braun
2022-03-24 16:12:42 +01:00
parent 342a97df0f
commit 482cee0552
13 changed files with 467 additions and 665 deletions

View File

@@ -0,0 +1,118 @@
import logging
from luibackend.exceptions import LuiBackendConfigIncomplete
from luibackend.exceptions import LuiBackendConfigError
LOGGER = logging.getLogger(__name__)
class PageNode(object):
def __init__(self, data, parent=None):
self.data = data
self.name = None
self.childs = []
self.parent = parent
if "items" in data:
childs = data.pop("items")
for page in childs:
self.add_child(PageNode(page, self))
name = self.data.get("heading", "unkown") if type(self.data) is dict else self.data
ptype = self.data.get("type", "unkown") if type(self.data) is dict else "leaf"
#parent = self.parent.data.get("heading", self.parent.data.get("type", "unknown")) if self.parent is not None else "root"
self.name = f"{ptype}.{name}" if type(self.data) is dict else self.data
def add_child(self, obj):
self.childs.append(obj)
def dump(self, indent=0):
"""dump tree to string"""
tab = ' '*(indent-1) + ' |- ' if indent > 0 else ''
name = self.name
parent = self.parent.name if self.parent is not None else "root"
dumpstring = f"{tab}{name} -> {parent} \n"
for obj in self.childs:
dumpstring += obj.dump(indent + 1)
return dumpstring
def get_items(self):
items = []
for i in self.childs:
if len(i.childs) > 0:
items.append("navigate.todo")
else:
items.append(i.data)
return items
class LuiBackendConfig(object):
_DEFAULT_CONFIG = {
'panelRecvTopic': "tele/tasmota_your_mqtt_topic/RESULT",
'panelSendTopic': "cmnd/tasmota_your_mqtt_topic/CustomSend",
'updateMode': "auto-notify",
'timeoutScreensaver': 20,
'brightnessScreensaver': 20,
'locale': "en_US",
'timeFormat': "%H:%M",
'dateFormatBabel': "full",
'dateFormat': "%A, %d. %B %Y",
'pages': [{
'type': 'screensaver',
'weather': 'weather.example',
'items': [{
'type': 'cardEntities',
'heading': 'Test Entities 1',
'items': ['switch.test_item', 'switch.test_item', 'switch.test_item']
}, {
'type': 'cardGrid',
'heading': 'Test Grid 1',
'items': ['switch.test_item', 'switch.test_item', 'switch.test_item']
}
],
}],
}
def __init__(self, args=None, check=True):
self._config = {}
self._page_config = None
if args:
self.load(args)
if check:
self.check()
def load(self, args):
for k, v in args.items():
if k in self._DEFAULT_CONFIG:
self._config[k] = v
LOGGER.info(f"Loaded config: {self._config}")
root_page = self.get("pages")[0]
self._page_config = PageNode(root_page)
LOGGER.info(f"Parsed Page config to the following Tree: \n {self._page_config.dump()}")
def check(self):
errors = 0
def get(self, name):
value = self._config.get(name)
if value is None:
value = self._DEFAULT_CONFIG.get(name)
return value
def get_root_page(self):
return self._page_config
def get_child_by_heading(self, heading):
for page in self._current_page.childs:
if heading == page.data["heading"]:
self._current_page = page
return self._current_page

View File

@@ -0,0 +1,50 @@
import logging
import datetime
from pages import LuiPages
LOGGER = logging.getLogger(__name__)
class LuiController(object):
def __init__(self, ha_api, config, send_mqtt_msg):
self._ha_api = ha_api
self._config = config
self._send_mqtt_msg = send_mqtt_msg
self._current_page = None
self._previous_page = None
self._pages = LuiPages(ha_api, config, send_mqtt_msg)
# Setup time update callback
time = datetime.time(0, 0, 0)
ha_api.run_minutely(self._pages.update_time, time)
# send panel back to startup page on restart of this script
self._pages.page_type("pageStartup")
#{'type': 'sceensaver', 'weather': 'weather.k3ll3r', 'items': [{'type': 'cardEntities', 'heading': 'Test Entities 1', 'items': ['switch.test_item', {'type': 'cardEntities', 'heading': 'Test Entities 1', 'items': ['switch.test_item', 'switch.test_item', 'switch.test_item', 'switch.test_item']}, 'switch.test_item', 'switch.test_item']}, {'type': 'cardGrid', 'heading': 'Test Grid 1', 'items': ['switch.test_item', 'switch.test_item', 'switch.test_item']}]}
def startup(self, display_firmware_version):
LOGGER.info(f"Startup Event; Display Firmware Version is {display_firmware_version}")
# send time and date on startup
self._pages.update_time("")
self._pages.update_date("")
# send panel to root page
self._current_page = self._config.get_root_page()
self._pages.render_page(self._current_page)
def next(self):
return
def button_press(self, entity_id, btype, value):
LOGGER.debug(f"Button Press Event; entity_id: {entity_id}; btype: {btype}; value: {value} ")
if(entity_id == "screensaver" and btype == "enter"):
if self._previous_page is None:
self._pages.render_page(self._current_page.childs[0])
else:
self._pages.render_page(self._previous_page)

View File

@@ -0,0 +1,8 @@
class LuiBackendException(Exception):
pass
class LuiBackendConfigIncomplete(LuiBackendException):
pass
class LuiBackendConfigError(LuiBackendException):
pass

View File

@@ -0,0 +1,40 @@
import colorsys
import math
def scale(val, src, dst):
"""
Scale the given value from the scale of src to the scale of dst.
"""
return ((val - src[0]) / (src[1]-src[0])) * (dst[1]-dst[0]) + dst[0]
def hsv2rgb(h, s, v):
hsv = colorsys.hsv_to_rgb(h,s,v)
return tuple(round(i * 255) for i in hsv)
def pos_to_color(x, y):
r = 160/2
x = round((x - r) / r * 100) / 100
y = round((r - y) / r * 100) / 100
r = math.sqrt(x*x + y*y)
sat = 0
if (r > 1):
sat = 0
else:
sat = r
hsv = (math.degrees(math.atan2(y, x))%360/360, sat, 1)
rgb = hsv2rgb(hsv[0],hsv[1],hsv[2])
return rgb
def rgb_brightness(rgb_color, brightness):
red = rgb_color[0]/255*brightness
green = rgb_color[1]/255*brightness
blue = rgb_color[2]/255*brightness
return [red, green, blue]
def rgb_dec565(rgb_color):
red = rgb_color[0]
green = rgb_color[1]
blue = rgb_color[2]
# take in the red, green and blue values (0-255) as 8 bit values and then combine
# and shift them to make them a 16 bit dec value in 565 format.
return ((int(red / 255 * 31) << 11) | (int(green / 255 * 63) << 5) | (int(blue / 255 * 31)))

View File

@@ -0,0 +1,48 @@
icons = {
'alert-circle-outline': 0,
'lightbulb': 1,
'thermometer': 2,
'gesture-tap-button': 3,
'flash': 4,
'music': 5,
'check-circle-outline': 6,
'close-circle-outline': 7,
'pause': 8,
'play': 9,
'palette': 10,
'window-open': 11,
'weather-cloudy': 12,
'weather-fog': 13,
'weather-hail': 14,
'weather-lightning': 15,
'weather-lightning-rainy': 16,
'weather-night': 17,
'weather-partly-cloudy': 18,
'weather-pouring': 19,
'weather-rainy': 20,
'weather-snowy': 21,
'weather-snowy-rainy': 22,
'weather-sunny': 23,
'weather-windy': 24,
'weather-windy-variant': 25,
'water-percent': 26,
'power': 27,
'fire': 28,
'calendar-sync': 29,
'fan': 30,
'snowflake': 31,
'solar-power': 32,
'battery-charging-medium': 33,
'battery-medium': 34,
'shield-home': 35,
'door-open': 36,
'door-closed': 37,
'window-closed': 38,
}
def get_icon_id(ma_name):
if ma_name in icons:
return icons[ma_name]
else:
return icons["alert-circle-outline"]

View File

@@ -0,0 +1,64 @@
from icon_mapping import get_icon_id
weather_mapping = {
'clear-night': 'weather-night',
'cloudy': 'weather-cloudy',
'exceptional': 'alert-circle-outline',
'fog': 'weather-fog',
'hail': 'weather-hail',
'lightning': 'weather-lightning',
'lightning-rainy': 'weather-lightning-rainy',
'partlycloudy': 'weather-partly-cloudy',
'pouring': 'weather-pouring',
'rainy': 'weather-rainy',
'snowy': 'weather-snowy',
'snowy-rainy': 'weather-snowy-rainy',
'sunny': 'weather-sunny',
'windy': 'weather-windy',
'windy-variant': 'weather-windy-variant'
}
sensor_mapping_on = {
"door": "door-open",
}
sensor_mapping_off = {
"door": "door-closed",
}
sensor_mapping = {
"temperature": "thermometer",
"power": "flash"
}
def map_to_mdi_name(ha_type, state=None, device_class=None):
if ha_type == "weather":
return weather_mapping[state] if state in weather_mapping else "alert-circle-outline"
if ha_type == "button":
return "gesture-tap-button"
if ha_type == "scene":
return "palette"
if ha_type == "switch":
return "flash"
if ha_type == "light":
return "lightbulb"
if ha_type == "input_boolean":
return "check-circle-outline" if state == "on" else "close-circle-outline"
if ha_type == "cover":
return "window-open" if state == "open" else "window-closed"
elif ha_type == "sensor":
if state == "on":
return sensor_mapping_on[device_class] if device_class in sensor_mapping_on else "alert-circle-outline"
elif state == "off":
return sensor_mapping_off[device_class] if device_class in sensor_mapping_off else "alert-circle-outline"
else:
return sensor_mapping[device_class] if device_class in sensor_mapping else "alert-circle-outline"
return "alert-circle-outline"
def get_icon_id_ha(ha_name, state=None, device_class=None, overwrite=None):
if overwrite is not None:
return get_icon_id(overwrite)
return get_icon_id(map_to_mdi_name(ha_name, state, device_class))

View File

@@ -0,0 +1,38 @@
import json
import logging
LOGGER = logging.getLogger(__name__)
class LuiMqttListener(object):
def __init__(self, mqtt_api, topic, controller):
self._controller = controller
# Setup, mqtt subscription and callback
mqtt_api.mqtt_subscribe(topic=topic)
mqtt_api.listen_event(self.mqtt_event_callback, "MQTT_MESSAGE", topic=topic, namespace='mqtt')
def mqtt_event_callback(self, event_name, data, kwargs):
LOGGER.info(f'MQTT callback for: {data}')
# Parse Json Message from Tasmota and strip out message from nextion display
data = json.loads(data["payload"])
if("CustomRecv" not in data):
return
msg = data["CustomRecv"]
LOGGER.info(f"Received Message from Screen: {msg}")
# Split message into parts seperated by ","
msg = msg.split(",")
# run action based on received command
if msg[0] == "event":
if msg[1] == "startup":
display_firmware_version = int(msg[2])
self._controller.startup(display_firmware_version)
if msg[1] == "pageOpen":
self._controller.next()
if msg[1] == "buttonPress2":
entity_id = msg[2]
btype = msg[3]
value = msg[4] if len(msg) > 4 else None
self._controller.button_press(entity_id, btype, value)

View File

@@ -0,0 +1,165 @@
import logging
import datetime
from icon_mapping import get_icon_id
from icons import get_icon_id_ha
from helper import scale, pos_to_color, rgb_dec565, rgb_brightness
# check Babel
import importlib
babel_spec = importlib.util.find_spec("babel")
if babel_spec is not None:
import babel.dates
LOGGER = logging.getLogger(__name__)
class LuiPages(object):
def __init__(self, ha_api, config, send_mqtt_msg):
self._ha_api = ha_api
self._config = config
self._send_mqtt_msg = send_mqtt_msg
def getEntityColor(self, entity):
attr = entity.attributes
default_color_on = rgb_dec565([253, 216, 53])
default_color_off = rgb_dec565([68, 115, 158])
icon_color = default_color_on if entity.state == "on" else default_color_off
if "rgb_color" in attr:
color = attr.rgb_color
if "brightness" in attr:
color = rgb_brightness(color, attr.brightness)
icon_color = rgb_dec565(color)
elif "brightness" in attr:
color = rgb_brightness([253, 216, 53], attr.brightness)
icon_color = rgb_dec565(color)
return icon_color
def update_time(self, kwargs):
time = datetime.datetime.now().strftime(self._config.get("timeFormat"))
self._send_mqtt_msg(f"time,{time}")
def update_date(self, kwargs):
global babel_spec
if babel_spec is not None:
dateformat = self._config.get("dateFormatBabel")
locale = self._config.get("locale")
date = babel.dates.format_date(datetime.datetime.now(), dateformat, locale=locale)
else:
dateformat = self._config.get("dateFormat")
date = datetime.datetime.now().strftime(dateformat)
self._send_mqtt_msg(f"date,?{date}")
def page_type(self, target_page):
self._send_mqtt_msg(f"pageType,{target_page}")
def update_screensaver_weather(self, kwargs):
we_name = kwargs['weather']
unit = kwargs['unit']
if self._ha_api.entity_exists(we_name):
we = self._ha_api.get_entity(we_name)
else:
LOGGER.error("Skipping Weather Update, entitiy not found")
return
icon_cur = get_icon_id_ha("weather", state=we.state)
text_cur = f"{we.attributes.temperature}{unit}"
icon_cur_detail = get_icon_id("water-percent")
text_cur_detail = f"{we.attributes.humidity} %"
up1 = we.attributes.forecast[0]['datetime']
up1 = datetime.datetime.fromisoformat(up1)
icon1 = get_icon_id_ha("weather", state=we.attributes.forecast[0]['condition'])
down1 = we.attributes.forecast[0]['temperature']
up2 = we.attributes.forecast[1]['datetime']
up2 = datetime.datetime.fromisoformat(up2)
icon2 = get_icon_id_ha("weather", state=we.attributes.forecast[1]['condition'])
down2 = we.attributes.forecast[1]['temperature']
global babel_spec
if babel_spec is not None:
up1 = babel.dates.format_date(up1, "E", locale=self.config["locale"])
up2 = babel.dates.format_date(up2, "E", locale=self.config["locale"])
else:
up1 = up1.strftime("%a")
up2 = up2.strftime("%a")
self._send_mqtt_msg(f"weatherUpdate,?{icon_cur}?{text_cur}?{icon_cur_detail}?{text_cur_detail}?{up1}?{icon1}?{down1}?{up2}?{icon2}?{down2}")
def generate_entities_item(self, item):
icon = None
if type(item) is dict:
icon = next(iter(item.items()))[1]['icon']
item = next(iter(item.items()))[0]
# type of the item is the string before the "." in the item name
item_type = item.split(".")[0]
LOGGER.info(f"Generating item command for {item} with type {item_type}",)
# Internal Entities
if item_type == "delete":
return f",{item_type},,,,,"
if item_type == "navigate":
icon_id = get_icon_id_ha("button", overwrite=icon)
return f",button,{item},0,17299,{item},PRESS"
if not self._ha_api.entity_exists(item):
return f",text,{item},{get_icon_id('alert-circle-outline')},17299,Not found check, apps.yaml"
# HA Entities
entity = self._ha_api.get_entity(item)
name = entity.attributes.friendly_name
if item_type == "cover":
icon_id = get_icon_id_ha("cover", state=entity.state, overwrite=icon)
return f",shutter,{item},{icon_id},17299,{name},"
if item_type in "light":
switch_val = 1 if entity.state == "on" else 0
icon_color = self.getEntityColor(entity)
icon_id = get_icon_id_ha("light", overwrite=icon)
return f",{item_type},{item},{icon_id},{icon_color},{name},{switch_val}"
if item_type in ["switch", "input_boolean"]:
switch_val = 1 if entity.state == "on" else 0
icon_color = self.getEntityColor(entity)
icon_id = get_icon_id_ha(item_type, state=entity.state, overwrite=icon)
return f",switch,{item},{icon_id},{icon_color},{name},{switch_val}"
if item_type in ["sensor", "binary_sensor"]:
device_class = self.get_safe_ha_attribute(entity.attributes, "device_class", "")
icon_id = get_icon_id_ha("sensor", state=entity.state, device_class=device_class, overwrite=icon)
unit_of_measurement = self.get_safe_ha_attribute(entity.attributes, "unit_of_measurement", "")
value = entity.state + " " + unit_of_measurement
return f",text,{item},{icon_id},17299,{name},{value}"
if item_type in ["button", "input_button"]:
icon_id = get_icon_id_ha("button", overwrite=icon)
return f",button,{item},{icon_id},17299,{name},PRESS"
if item_type == "scene":
icon_id = get_icon_id_ha("scene", overwrite=icon)
return f",button,{item},{icon_id},17299,{name},ACTIVATE"
def generate_entities_page(self, heading, items):
# Set Heading of Page
self._send_mqtt_msg(f"entityUpdHeading,{heading}")
# Get items and construct cmd string
command = "entityUpd"
for item in items:
command += self.generate_entities_item(item)
self._send_mqtt_msg(command)
def render_page(self, page):
config = page.data
ptype = config["type"]
LOGGER.info(f"Started rendering of page x with type {ptype}")
# Switch to page
self.page_type(ptype)
if ptype == "screensaver":
we_name = config["weather"]
# update weather information
self.update_screensaver_weather(kwargs={"weather": we_name, "unit": "°C"})
return
if ptype == "cardEntities":
heading = config.get("heading", "unknown")
self.generate_entities_page(heading, page.get_items())
return