merged main into pr

This commit is contained in:
Johannes
2022-03-25 15:29:40 +01:00
27 changed files with 940 additions and 907 deletions

View File

@@ -0,0 +1,131 @@
import logging
LOGGER = logging.getLogger(__name__)
class PageNode(object):
def __init__(self, data, parent=None):
self.data = data
self.name = None
self.childs = []
self.parent = parent
self.pos = None
if "items" in data:
childs = data.pop("items")
for page in childs:
self.add_child(PageNode(page, self))
name = self.data.get("heading", "unkown") if type(self.data) is dict else self.data
ptype = self.data.get("type", "unkown") if type(self.data) is dict else "leaf"
self.name = f"{ptype}.{name}" if type(self.data) is dict else self.data
def add_child(self, obj):
obj.pos = len(self.childs)
self.childs.append(obj)
def next(self):
if self.parent is not None:
pos = self.pos
length = len(self.parent.childs)
return self.parent.childs[(pos+1)%length]
else:
return self
def prev(self):
if self.parent is not None:
pos = self.pos
length = len(self.parent.childs)
return self.parent.childs[(pos-1)%length]
else:
return self
def dump(self, indent=0):
"""dump tree to string"""
tab = ' '*(indent-1) + ' |- ' if indent > 0 else ''
name = self.name
parent = self.parent.name if self.parent is not None else "root"
dumpstring = f"{tab}{self.pos}:{name} -> {parent} \n"
for obj in self.childs:
dumpstring += obj.dump(indent + 1)
return dumpstring
def get_items(self):
items = []
for i in self.childs:
if len(i.childs) > 0:
items.append("navigate.todo")
else:
items.append(i.data)
return items
def get_all_items_recursive(self):
items = []
for i in self.childs:
if len(i.childs) > 0:
items.extend(i.get_all_items_recursive())
else:
if type(i.data) is dict:
items.append(i.data.get("item", next(iter(i.data))))
else:
items.append(i.data)
return items
class LuiBackendConfig(object):
_DEFAULT_CONFIG = {
'panelRecvTopic': "tele/tasmota_your_mqtt_topic/RESULT",
'panelSendTopic': "cmnd/tasmota_your_mqtt_topic/CustomSend",
'updateMode': "auto-notify",
'timeoutScreensaver': 20,
'brightnessScreensaver': 20,
'locale': "en_US",
'timeFormat': "%H:%M",
'dateFormatBabel': "full",
'dateFormat': "%A, %d. %B %Y",
'weather': 'weather.example',
'pages': [{
'type': 'cardEntities',
'heading': 'Test Entities 1',
'items': ['switch.test_item', 'switch.test_item', 'switch.test_item']
}, {
'type': 'cardGrid',
'heading': 'Test Grid 1',
'items': ['switch.test_item', 'switch.test_item', 'switch.test_item']
}
]
}
def __init__(self, args=None, check=True):
self._config = {}
self._page_config = None
if args:
self.load(args)
if check:
self.check()
def load(self, args):
for k, v in args.items():
if k in self._DEFAULT_CONFIG:
self._config[k] = v
LOGGER.info(f"Loaded config: {self._config}")
root_page = {"items": self.get("pages"), "name": "root"}
self._page_config = PageNode(root_page)
LOGGER.info(f"Parsed Page config to the following Tree: \n {self._page_config.dump()}")
def check(self):
return
def get(self, name):
value = self._config.get(name)
if value is None:
value = self._DEFAULT_CONFIG.get(name)
return value
def get_root_page(self):
return self._page_config

View File

@@ -0,0 +1,178 @@
import logging
import datetime
from helper import scale, pos_to_color
from pages import LuiPagesGen
LOGGER = logging.getLogger(__name__)
class LuiController(object):
def __init__(self, ha_api, config, send_mqtt_msg):
self._ha_api = ha_api
self._config = config
self._send_mqtt_msg = send_mqtt_msg
# first child of root page (default, after startup)
self._current_page = self._config._page_config.childs[0]
self._pages_gen = LuiPagesGen(ha_api, config, send_mqtt_msg)
# send panel back to startup page on restart of this script
self._pages_gen.page_type("pageStartup")
# time update callback
time = datetime.time(0, 0, 0)
ha_api.run_minutely(self._pages_gen.update_time, time)
# weather callback
weather_interval = 15 * 60 # 15 minutes
ha_api.run_every(self.weather_update, "now", weather_interval)
# register callbacks
self.register_callbacks()
self.current_screensaver_brightness = 20
# calc screensaver brightness
# set brightness of screensaver
if type(self._config.get("brightnessScreensaver")) == int:
self.current_screensaver_brightness = self._config.get("brightnessScreensaver")
elif type(self._config.get("brightnessScreensaver")) == list:
sorted_timesets = sorted(self._config.get("brightnessScreensaver"), key=lambda d: self._ha_api.parse_time(d['time']))
found_current_dim_value = False
for index, timeset in enumerate(sorted_timesets):
self._ha_api.run_daily(self.update_screensaver_brightness, timeset["time"], value=timeset["value"])
LOGGER.info("Current time %s", self._ha_api.get_now().time())
if self._ha_api.parse_time(timeset["time"]) > self._ha_api.get_now().time() and not found_current_dim_value:
# first time after current time, set dim value
self.current_screensaver_brightness = sorted_timesets[index-1]["value"]
LOGGER.info("Setting dim value to %s", sorted_timesets[index-1])
found_current_dim_value = True
# still no dim value
if not found_current_dim_value:
self.current_screensaver_brightness = sorted_timesets[-1]["value"]
# send screensaver brightness in case config has changed
self.update_screensaver_brightness(kwargs={"value": self.current_screensaver_brightness})
def startup(self):
LOGGER.info(f"Startup Event")
# send time and date on startup
self._pages_gen.update_time("")
self._pages_gen.update_date("")
# send panel to screensaver
self._pages_gen.page_type("screensaver")
self.weather_update("")
def update_screensaver_brightness(self, kwargs):
self.current_screensaver_brightness = kwargs['value']
self._send_mqtt_msg(f"dimmode,{self.current_screensaver_brightness}")
def weather_update(self, kwargs):
we_name = self._config.get("weather")
unit = "°C"
self._pages_gen.update_screensaver_weather(kwargs={"weather": we_name, "unit": unit})
def register_callbacks(self):
items = self._config.get_root_page().get_all_items_recursive()
LOGGER.info(f"Registering callbacks for the following items: {items}")
for item in items:
if self._ha_api.entity_exists(item):
self._ha_api.listen_state(self.state_change_callback, entity_id=item, attribute="all")
def state_change_callback(self, entity, attribute, old, new, kwargs):
LOGGER.info(f"Got callback for: {entity}")
if entity in self._current_page.get_items():
self._pages_gen.render_page(self._current_page)
# send detail page update, just in case
if self._current_page.type in ["cardGrid", "cardEntities"]:
if entity.startswith("light"):
self._pages_gen.generate_light_detail_page(entity)
if entity.startswith("switch"):
self._pages_gen.generate_shutter_detail_page(entity)
def detail_open(self, detail_type, entity_id):
if detail_type == "popupShutter":
self._pages_gen.generate_shutter_detail_page(entity_id)
if detail_type == "popupLight":
self._pages_gen.generate_light_detail_page(entity_id)
def button_press(self, entity_id, button_type, value):
LOGGER.debug(f"Button Press Event; entity_id: {entity_id}; button_type: {button_type}; value: {value} ")
# internal buttons
if entity_id == "screensaver" and button_type == "enter":
self._pages_gen.render_page(self._current_page)
if button_type == "bExit":
self._pages_gen.render_page(self._current_page)
if button_type == "bNext":
self._current_page = self._current_page.next()
self._pages_gen.render_page(self._current_page)
if button_type == "bPrev":
self._current_page = self._current_page.prev()
self._pages_gen.render_page(self._current_page)
elif entity_id == "updateDisplayNoYes" and value == "no":
self._pages_gen.render_page(self._current_page)
# buttons with actions on HA
if button_type == "OnOff":
if value == "1":
self._ha_api.turn_on(entity_id)
else:
self._ha_api.turn_off(entity_id)
# for shutter / covers
if button_type == "up":
self._ha_api.get_entity(entity_id).call_service("open_cover")
if button_type == "stop":
self._ha_api.get_entity(entity_id).call_service("stop_cover")
if button_type == "down":
self._ha_api.get_entity(entity_id).call_service("close_cover")
if button_type == "positionSlider":
pos = int(value)
self._ha_api.get_entity(entity_id).call_service("set_cover_position", position=pos)
if button_type == "button":
if entity_id.startswith('scene'):
self._ha_api.get_entity(entity_id).call_service("turn_on")
elif entity_id.startswith('light') or entity_id.startswith('switch') or entity_id.startswith('input_boolean'):
self._ha_api.get_entity(entity_id).call_service("toggle")
else:
self._ha_api.get_entity(entity_id).call_service("press")
# for media page
if button_type == "media-next":
self._ha_api.get_entity(entity_id).call_service("media_next_track")
if button_type == "media-back":
self._ha_api.get_entity(entity_id).call_service("media_previous_track")
if button_type == "media-pause":
self._ha_api.get_entity(entity_id).call_service("media_play_pause")
if button_type == "hvac_action":
self._ha_api.get_entity(entity_id).call_service("set_hvac_mode", hvac_mode=value)
if button_type == "volumeSlider":
pos = int(value)
# HA wants this value between 0 and 1 as float
pos = pos/100
self._ha_api.get_entity(entity_id).call_service("volume_set", volume_level=pos)
# for light detail page
if button_type == "brightnessSlider":
# scale 0-100 to ha brightness range
brightness = int(scale(int(value),(0,100),(0,255)))
self._ha_api.get_entity(entity_id).call_service("turn_on", brightness=brightness)
if button_type == "colorTempSlider":
entity = self._ha_api.get_entity(entity_id)
#scale 0-100 from slider to color range of lamp
color_val = scale(int(value), (0, 100), (entity.attributes.min_mireds, entity.attributes.max_mireds))
self._ha_api.get_entity(entity_id).call_service("turn_on", color_temp=color_val)
if button_type == "colorWheel":
self._ha_api.log(value)
value = value.split('|')
color = pos_to_color(int(value[0]), int(value[1]))
self._ha_api.log(color)
self._ha_api.get_entity(entity_id).call_service("turn_on", rgb_color=color)
# for climate page
if button_type == "tempUpd":
temp = int(value)/10
self._ha_api.get_entity(entity_id).call_service("set_temperature", temperature=temp)

View File

@@ -0,0 +1,40 @@
import colorsys
import math
def scale(val, src, dst):
"""
Scale the given value from the scale of src to the scale of dst.
"""
return ((val - src[0]) / (src[1]-src[0])) * (dst[1]-dst[0]) + dst[0]
def hsv2rgb(h, s, v):
hsv = colorsys.hsv_to_rgb(h,s,v)
return tuple(round(i * 255) for i in hsv)
def pos_to_color(x, y):
r = 160/2
x = round((x - r) / r * 100) / 100
y = round((r - y) / r * 100) / 100
r = math.sqrt(x*x + y*y)
sat = 0
if (r > 1):
sat = 0
else:
sat = r
hsv = (math.degrees(math.atan2(y, x))%360/360, sat, 1)
rgb = hsv2rgb(hsv[0],hsv[1],hsv[2])
return rgb
def rgb_brightness(rgb_color, brightness):
red = rgb_color[0]/255*brightness
green = rgb_color[1]/255*brightness
blue = rgb_color[2]/255*brightness
return [red, green, blue]
def rgb_dec565(rgb_color):
red = rgb_color[0]
green = rgb_color[1]
blue = rgb_color[2]
# take in the red, green and blue values (0-255) as 8 bit values and then combine
# and shift them to make them a 16 bit dec value in 565 format.
return ((int(red / 255 * 31) << 11) | (int(green / 255 * 63) << 5) | (int(blue / 255 * 31)))

View File

@@ -0,0 +1,48 @@
icons = {
'alert-circle-outline': 0,
'lightbulb': 1,
'thermometer': 2,
'gesture-tap-button': 3,
'flash': 4,
'music': 5,
'check-circle-outline': 6,
'close-circle-outline': 7,
'pause': 8,
'play': 9,
'palette': 10,
'window-open': 11,
'weather-cloudy': 12,
'weather-fog': 13,
'weather-hail': 14,
'weather-lightning': 15,
'weather-lightning-rainy': 16,
'weather-night': 17,
'weather-partly-cloudy': 18,
'weather-pouring': 19,
'weather-rainy': 20,
'weather-snowy': 21,
'weather-snowy-rainy': 22,
'weather-sunny': 23,
'weather-windy': 24,
'weather-windy-variant': 25,
'water-percent': 26,
'power': 27,
'fire': 28,
'calendar-sync': 29,
'fan': 30,
'snowflake': 31,
'solar-power': 32,
'battery-charging-medium': 33,
'battery-medium': 34,
'shield-home': 35,
'door-open': 36,
'door-closed': 37,
'window-closed': 38,
}
def get_icon_id(ma_name):
if ma_name in icons:
return icons[ma_name]
else:
return icons["alert-circle-outline"]

View File

@@ -0,0 +1,64 @@
from icon_mapping import get_icon_id
weather_mapping = {
'clear-night': 'weather-night',
'cloudy': 'weather-cloudy',
'exceptional': 'alert-circle-outline',
'fog': 'weather-fog',
'hail': 'weather-hail',
'lightning': 'weather-lightning',
'lightning-rainy': 'weather-lightning-rainy',
'partlycloudy': 'weather-partly-cloudy',
'pouring': 'weather-pouring',
'rainy': 'weather-rainy',
'snowy': 'weather-snowy',
'snowy-rainy': 'weather-snowy-rainy',
'sunny': 'weather-sunny',
'windy': 'weather-windy',
'windy-variant': 'weather-windy-variant'
}
sensor_mapping_on = {
"door": "door-open",
}
sensor_mapping_off = {
"door": "door-closed",
}
sensor_mapping = {
"temperature": "thermometer",
"power": "flash"
}
def map_to_mdi_name(ha_type, state=None, device_class=None):
if ha_type == "weather":
return weather_mapping[state] if state in weather_mapping else "alert-circle-outline"
if ha_type == "button":
return "gesture-tap-button"
if ha_type == "scene":
return "palette"
if ha_type == "switch":
return "flash"
if ha_type == "light":
return "lightbulb"
if ha_type == "input_boolean":
return "check-circle-outline" if state == "on" else "close-circle-outline"
if ha_type == "cover":
return "window-open" if state == "open" else "window-closed"
elif ha_type == "sensor":
if state == "on":
return sensor_mapping_on[device_class] if device_class in sensor_mapping_on else "alert-circle-outline"
elif state == "off":
return sensor_mapping_off[device_class] if device_class in sensor_mapping_off else "alert-circle-outline"
else:
return sensor_mapping[device_class] if device_class in sensor_mapping else "alert-circle-outline"
return "alert-circle-outline"
def get_icon_id_ha(ha_name, state=None, device_class=None, overwrite=None):
if overwrite is not None:
return get_icon_id(overwrite)
return get_icon_id(map_to_mdi_name(ha_name, state, device_class))

View File

@@ -0,0 +1,12 @@
translations = {
'de_DE': {
'ACTIVATE': "AKTIVIEREN",
'PRESS': "DRÜCKEN",
}
}
def get_translation(locale, input):
if locale in translations:
return translations.get(locale).get(input, input)
else:
return input

View File

@@ -0,0 +1,53 @@
import json
import logging
LOGGER = logging.getLogger(__name__)
class LuiMqttListener(object):
def __init__(self, mqtt_api, topic, controller, updater):
self._controller = controller
self._updater = updater
# Setup, mqtt subscription and callback
mqtt_api.mqtt_subscribe(topic=topic)
mqtt_api.listen_event(self.mqtt_event_callback, "MQTT_MESSAGE", topic=topic, namespace='mqtt')
def mqtt_event_callback(self, event_name, data, kwargs):
LOGGER.info(f'MQTT callback for: {data}')
# Parse Json Message from Tasmota and strip out message from nextion display
data = json.loads(data["payload"])
if("nlui_driver_version" in data):
msg = data["nlui_driver_version"]
self._updater.set_tasmota_driver_version(int(msg))
if("CustomRecv" not in data):
return
msg = data["CustomRecv"]
LOGGER.info(f"Received Message from Screen: {msg}")
# Split message into parts seperated by ","
msg = msg.split(",")
# run action based on received command
if msg[0] == "event":
if msg[1] == "startup":
display_firmware_version = int(msg[2])
self._updater.set_current_display_firmware_version(display_firmware_version)
# check for updates
msg_send = self._updater.check_updates()
# send messages for current page
if not msg_send:
self._controller.startup()
if msg[1] == "screensaverOpen":
self._controller.weather_update("")
if msg[1] == "buttonPress2":
entity_id = msg[2]
btype = msg[3]
value = msg[4] if len(msg) > 4 else None
if entity_id == "updateDisplayNoYes" and value == "yes":
self._updater.update_panel_driver()
self._controller.button_press(entity_id, btype, value)
if msg[1] == "pageOpenDetail":
self._controller.detail_open(msg[2], msg[3])

View File

@@ -0,0 +1,277 @@
import logging
import datetime
from icon_mapping import get_icon_id
from icons import get_icon_id_ha
from helper import scale, rgb_dec565, rgb_brightness
from localization import get_translation
# check Babel
import importlib
babel_spec = importlib.util.find_spec("babel")
if babel_spec is not None:
import babel.dates
LOGGER = logging.getLogger(__name__)
class LuiPagesGen(object):
def __init__(self, ha_api, config, send_mqtt_msg):
self._ha_api = ha_api
self._config = config
self._locale = config.get("locale")
self._send_mqtt_msg = send_mqtt_msg
def get_entity_color(self, entity):
attr = entity.attributes
default_color_on = rgb_dec565([253, 216, 53])
default_color_off = rgb_dec565([68, 115, 158])
icon_color = default_color_on if entity.state == "on" else default_color_off
if "rgb_color" in attr:
color = attr.rgb_color
if "brightness" in attr:
color = rgb_brightness(color, attr.brightness)
icon_color = rgb_dec565(color)
elif "brightness" in attr:
color = rgb_brightness([253, 216, 53], attr.brightness)
icon_color = rgb_dec565(color)
return icon_color
def update_time(self, kwargs):
time = datetime.datetime.now().strftime(self._config.get("timeFormat"))
self._send_mqtt_msg(f"time,{time}")
def update_date(self, kwargs):
global babel_spec
if babel_spec is not None:
dateformat = self._config.get("dateFormatBabel")
date = babel.dates.format_date(datetime.datetime.now(), dateformat, locale=self._locale)
else:
dateformat = self._config.get("dateFormat")
date = datetime.datetime.now().strftime(dateformat)
self._send_mqtt_msg(f"date,?{date}")
def page_type(self, target_page):
self._send_mqtt_msg(f"pageType,{target_page}")
def update_screensaver_weather(self, kwargs):
we_name = kwargs['weather']
unit = kwargs['unit']
if self._ha_api.entity_exists(we_name):
we = self._ha_api.get_entity(we_name)
else:
LOGGER.error("Skipping Weather Update, entitiy not found")
return
icon_cur = get_icon_id_ha("weather", state=we.state)
text_cur = f"{we.attributes.temperature}{unit}"
icon_cur_detail = get_icon_id("water-percent")
text_cur_detail = f"{we.attributes.humidity} %"
up1 = we.attributes.forecast[0]['datetime']
up1 = datetime.datetime.fromisoformat(up1)
icon1 = get_icon_id_ha("weather", state=we.attributes.forecast[0]['condition'])
down1 = we.attributes.forecast[0]['temperature']
up2 = we.attributes.forecast[1]['datetime']
up2 = datetime.datetime.fromisoformat(up2)
icon2 = get_icon_id_ha("weather", state=we.attributes.forecast[1]['condition'])
down2 = we.attributes.forecast[1]['temperature']
global babel_spec
if babel_spec is not None:
up1 = babel.dates.format_date(up1, "E", locale=self._locale)
up2 = babel.dates.format_date(up2, "E", locale=self._locale)
else:
up1 = up1.strftime("%a")
up2 = up2.strftime("%a")
self._send_mqtt_msg(f"weatherUpdate,?{icon_cur}?{text_cur}?{icon_cur_detail}?{text_cur_detail}?{up1}?{icon1}?{down1}?{up2}?{icon2}?{down2}")
def generate_entities_item(self, item):
icon = None
name = None
if type(item) is dict:
icon = next(iter(item.items()))[1].get('icon')
name = next(iter(item.items()))[1].get('name')
item = next(iter(item.items()))[0]
# type of the item is the string before the "." in the item name
item_type = item.split(".")[0]
LOGGER.info(f"Generating item command for {item} with type {item_type}",)
# Internal Entities
if item_type == "delete":
return f",{item_type},,,,,"
if item_type == "navigate":
text = get_translation(self._locale,"PRESS")
return f",button,{item},0,17299,{item},{text}"
if not self._ha_api.entity_exists(item):
return f",text,{item},{get_icon_id('alert-circle-outline')},17299,Not found check, apps.yaml"
# HA Entities
entity = self._ha_api.get_entity(item)
name = name if name is not None else entity.attributes.friendly_name
if item_type == "cover":
icon_id = get_icon_id_ha("cover", state=entity.state, overwrite=icon)
return f",shutter,{item},{icon_id},17299,{name},"
if item_type in "light":
switch_val = 1 if entity.state == "on" else 0
icon_color = self.get_entity_color(entity)
icon_id = get_icon_id_ha("light", overwrite=icon)
return f",{item_type},{item},{icon_id},{icon_color},{name},{switch_val}"
if item_type in ["switch", "input_boolean"]:
switch_val = 1 if entity.state == "on" else 0
icon_color = self.get_entity_color(entity)
icon_id = get_icon_id_ha(item_type, state=entity.state, overwrite=icon)
return f",switch,{item},{icon_id},{icon_color},{name},{switch_val}"
if item_type in ["sensor", "binary_sensor"]:
device_class = entity.attributes.get("device_class", "")
icon_id = get_icon_id_ha("sensor", state=entity.state, device_class=device_class, overwrite=icon)
unit_of_measurement = entity.attributes.get("unit_of_measurement", "")
value = entity.state + " " + unit_of_measurement
icon_color = self.get_entity_color(entity)
return f",text,{item},{icon_id},{icon_color},{name},{value}"
if item_type in ["button", "input_button"]:
icon_id = get_icon_id_ha("button", overwrite=icon)
return f",button,{item},{icon_id},17299,{name},PRESS"
if item_type == "scene":
icon_id = get_icon_id_ha("scene", overwrite=icon)
text = get_translation(self._locale,"PRESS")
return f",button,{item},{icon_id},17299,{name},{text}"
def generate_entities_page(self, heading, items):
# Set Heading of Page
self._send_mqtt_msg(f"entityUpdHeading,{heading}")
# Get items and construct cmd string
command = "entityUpd"
for item in items:
command += self.generate_entities_item(item)
self._send_mqtt_msg(command)
def generate_thermo_page(self, item):
if not self._ha_api.entity_exists(item):
command = f"entityUpd,{item},Not found,220,220,Not found,150,300,5"
else:
entity = self._ha_api.get_entity(item)
heading = entity.attributes.friendly_name
current_temp = int(entity.attributes.get("current_temperature", 0)*10)
dest_temp = int(entity.attributes.get("temperature", 0)*10)
status = entity.attributes.get("hvac_action", "")
min_temp = int(entity.attributes.get("min_temp", 0)*10)
max_temp = int(entity.attributes.get("max_temp", 0)*10)
step_temp = int(entity.attributes.get("target_temp_step", 0.5)*10)
icon_res = ""
hvac_modes = entity.attributes.get("hvac_modes", [])
for mode in hvac_modes:
icon_id = get_icon_id('alert-circle-outline')
color_on = 64512
if mode == "auto":
icon_id = get_icon_id("calendar-sync")
color_on = 1024
if mode == "heat":
icon_id = get_icon_id("fire")
color_on = 64512
if mode == "off":
icon_id = get_icon_id("power")
color_on = 35921
if mode == "cool":
icon_id = get_icon_id("snowflake")
color_on = 11487
if mode == "dry":
icon_id = get_icon_id("water-percent")
color_on = 60897
if mode == "fan_only":
icon_id = get_icon_id("fan")
color_on = 35921
state = 0
if(mode == entity.state):
state = 1
icon_res += f",{icon_id},{color_on},{state},{mode}"
len_hvac_modes = len(hvac_modes)
if len_hvac_modes%2 == 0:
# even
padding_len = int((4-len_hvac_modes)/2)
icon_res = ","*4*padding_len + icon_res + ","*4*padding_len
# use last 4 icons
icon_res = ","*4*5 + icon_res
else:
# uneven
padding_len = int((5-len_hvac_modes)/2)
icon_res = ","*4*padding_len + icon_res + ","*4*padding_len
# use first 5 icons
icon_res = icon_res + ","*4*4
command = f"entityUpd,{item},{heading},{current_temp},{dest_temp},{status},{min_temp},{max_temp},{step_temp}{icon_res}"
self._send_mqtt_msg(command)
def generate_media_page(self, item):
if not self._ha_api.entity_exists(item):
command = f"entityUpd,|{item}|Not found|{get_icon_id('alert-circle-outline')}|Please check your|apps.yaml in AppDaemon|50|{get_icon_id('alert-circle-outline')}"
else:
entity = self._ha_api.get_entity(item)
heading = entity.attributes.friendly_name
icon = 0
title = entity.attributes.get("media_title", "")
author = entity.attributes.get("media_artist", "")
volume = int(entity.attributes.get("volume_level", 0)*100)
iconplaypause = get_icon_id("pause") if entity.state == "playing" else get_icon_id("play")
if "media_content_type" in entity.attributes:
if entity.attributes.media_content_type == "music":
icon = get_icon_id("music")
command = f"entityUpd,|{item}|{heading}|{icon}|{title}|{author}|{volume}|{iconplaypause}"
self._send_mqtt_msg(command)
def render_page(self, page):
LOGGER.info(page)
config = page.data
page_type = config["type"]
LOGGER.info(f"Started rendering of page x with type {page_type}")
# Switch to page
self.page_type(page_type)
if page_type in ["cardEntities", "cardGrid"]:
heading = config.get("heading", "unknown")
self.generate_entities_page(heading, page.get_items())
return
if page_type == "cardThermo":
LOGGER.info(page.data)
self.generate_thermo_page(page.data.get("item"))
if page_type == "cardMedia":
LOGGER.info(page.data)
self.generate_media_page(page.data.get("item"))
def generate_light_detail_page(self, entity):
entity = self._ha_api.get_entity(entity)
switch_val = 1 if entity.state == "on" else 0
icon_color = self.get_entity_color(entity)
brightness = "disable"
color_temp = "disable"
color = "disable"
# scale 0-255 brightness from ha to 0-100
if entity.state == "on":
if "brightness" in entity.attributes:
brightness = int(scale(entity.attributes.brightness,(0,255),(0,100)))
else:
brightness = "disable"
if "color_temp" in entity.attributes.supported_color_modes:
if "color_temp" in entity.attributes:
# scale ha color temp range to 0-100
color_temp = int(scale(entity.attributes.color_temp,(entity.attributes.min_mireds, entity.attributes.max_mireds),(0,100)))
else:
color_temp = "unknown"
else:
color_temp = "disable"
list_color_modes = ["xy", "rgb", "rgbw", "hs"]
if any(item in list_color_modes for item in entity.attributes.supported_color_modes):
color = "enable"
else:
color = "disable"
self._send_mqtt_msg(f"entityUpdateDetail,{get_icon_id('lightbulb')},{icon_color},{switch_val},{brightness},{color_temp},{color}")
def generate_shutter_detail_page(self, entity):
pos = 100-int(entity.attributes.get("current_position", 50))
self._send_mqtt_msg(f"entityUpdateDetail,{pos}")
def send_message_page(self, id, heading, msg, b1, b2):
self._send_mqtt_msg(f"pageType,popupNotify")
self._send_mqtt_msg(f"entityUpdateDetail,|{id}|{heading}|65535|{b1}|65535|{b2}|65535|{msg}|65535|0")

View File

@@ -0,0 +1,77 @@
import logging
LOGGER = logging.getLogger(__name__)
class Updater:
def __init__(self, send_mqtt_msg, topic_send, mode, desired_display_firmware_version, desired_display_firmware_url, desired_tasmota_driver_version, desired_tasmota_driver_url):
self.desired_display_firmware_version = desired_display_firmware_version
self.desired_display_firmware_url = desired_display_firmware_url
self.desired_tasmota_driver_version = desired_tasmota_driver_version
self.desired_tasmota_driver_url = desired_tasmota_driver_url
self.mode = mode
self._send_mqtt_msg = send_mqtt_msg
self.topic_send = topic_send
self.current_tasmota_driver_version = None
self.current_display_firmware_version = None
def set_tasmota_driver_version(self, driver_version):
self.current_tasmota_driver_version = driver_version
def set_current_display_firmware_version(self, panel_version):
self.current_display_firmware_version = panel_version
def check_pre_req(self):
# we need to know both versions to continue
if self.current_tasmota_driver_version is not None and self.current_display_firmware_version is not None:
# tasmota driver has to be at least version 2 for Update command
# and panel has to be at version 11 for notify commands
# version 16 for new button cmd format
if self.current_tasmota_driver_version >= 2 and self.current_display_firmware_version >= 16:
return True
return False
def send_message_page(self, id, heading, msg, b1, b2):
self._send_mqtt_msg(f"pageType,popupNotify")
self._send_mqtt_msg(f"entityUpdateDetail,|{id}|{heading}|65535|{b1}|65535|{b2}|65535|{msg}|65535|0")
def check_updates(self):
# return's true if a notification was send to the panel
# run pre req check
if self.check_pre_req():
LOGGER.info("Update Pre-Check sucessful Tasmota Driver Version: %s Panel Version: %s", self.current_tasmota_driver_version, self.current_display_firmware_version)
# check if tasmota driver needs update
if self.current_tasmota_driver_version < self.desired_tasmota_driver_version:
LOGGER.info("Update of Tasmota Driver needed")
# in auto mode just do the update
if self.mode == "auto":
self.update_berry_driver()
return False
# send notification about the update
if self.mode == "auto-notify":
update_msg = "There's an update avalible for the tasmota berry driver, do you want to start the update now? If you encounter issues after the update or this message appears frequently, please checkthe manual and repeat the installation steps for the tasmota berry driver. "
self.send_message_page("updateBerryNoYes", "Driver Update available!", update_msg, "Dismiss", "Yes")
return True
return False
# check if display firmware needs an update
if self.current_display_firmware_version < self.desired_display_firmware_version:
LOGGER.info("Update of Display Firmware needed")
# in auto mode just do the update
if self.mode == "auto":
self.update_panel_driver()
return False
# send notification about the update
if self.mode == "auto-notify":
update_msg = "There's a firmware update avalible for the nextion sceen inside of nspanel, do you want to start the update now? If the update fails check the installation manual and flash again over the tasmota console. Be pationed the update will take a while."
self.send_message_page("updateDisplayNoYes", "Display Update available!", update_msg, "Dismiss", "Yes")
return True
return False
else:
LOGGER.info("Update Pre-Check failed Tasmota Driver Version: %s Panel Version: %s", self.current_tasmota_driver_version, self.current_display_firmware_version)
return False
def update_berry_driver(self):
topic = self.topic_send.replace("CustomSend", "UpdateDriverVersion")
self._send_mqtt_msg(self.desired_tasmota_driver_url, topic=topic)
def update_panel_driver(self):
topic = self.topic_send.replace("CustomSend", "FlashNextion")
self._send_mqtt_msg(self.desired_display_firmware_url, topic=topic)