Blinking added with Light-Effects in Home Assistant

This commit is contained in:
2026-04-15 22:05:55 +02:00
parent 4a7551e358
commit d9d17e5e5c

194
gauge.py
View File

@@ -223,6 +223,15 @@ def set_status_led(gauge_idx, led_type, on):
_set_led(gauge_idx, _LED_STATUS_GREEN, r, g, b)
def _apply_blink_or_led(gauge_idx, led_idx, color, effect):
"""Set LED to color; if effect is a known blink preset, start blinking."""
r, g, b = color
arduino_send(f"LED {gauge_idx} {led_idx} {r} {g} {b}")
if effect in _BLINK_PRESETS:
on_ms, off_ms = _BLINK_PRESETS[effect]
arduino_send(f"BLINK {gauge_idx} {led_idx} {on_ms} {off_ms}")
# ---------------------------------------------------------------------------
# State tracking (for MQTT state publishing)
# ---------------------------------------------------------------------------
@@ -240,6 +249,19 @@ _BL_SAVE_DELAY_MS = 5000
client_ref = None
_mqtt_connected = False
_BLINK_PRESETS = {
"Blink Slow": (800, 800),
"Blink Fast": (150, 150),
"Blink Alert": (100, 400),
}
_EFFECT_LIST = list(_BLINK_PRESETS.keys())
_red_effect = [None] * num_gauges
_green_effect = [None] * num_gauges
_status_red_effect = [None] * num_gauges
_status_green_effect= [None] * num_gauges
_bl_effect = [None] * num_gauges
def _backlight_changed(gauge_idx, new_color, new_on, new_brightness):
return (
@@ -295,6 +317,8 @@ def publish_backlight_states(client):
"brightness": int(brightness * 2.55),
"color": {"r": r, "g": g, "b": b},
}
if _bl_effect[i]:
state["effect"] = _bl_effect[i]
try:
client.publish(gt["led_bl_state"], ujson.dumps(state), retain=True)
except Exception as e:
@@ -344,15 +368,15 @@ def make_gauge_topics(prefix, gauge_id):
"led_red_state": f"{prefix}/gauge{gauge_id}/led/red/state",
"led_green_state": f"{prefix}/gauge{gauge_id}/led/green/state",
"led_bl_state": f"{prefix}/gauge{gauge_id}/led/backlight/state",
"led_red_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_red/config",
"led_green_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_green/config",
"led_red_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_red/config",
"led_green_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_green/config",
"led_bl_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_bl/config",
"status_red": f"{prefix}/gauge{gauge_id}/status_led/red/set",
"status_green": f"{prefix}/gauge{gauge_id}/status_led/green/set",
"status_red_state": f"{prefix}/gauge{gauge_id}/status_led/red/state",
"status_green_state": f"{prefix}/gauge{gauge_id}/status_led/green/state",
"status_red_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_status_red/config",
"status_green_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_status_green/config",
"status_red_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_status_red/config",
"status_green_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_status_green/config",
}
@@ -464,27 +488,50 @@ def on_message(topic, payload):
return
if topic == gt["led_red"]:
state = payload.upper() == "ON"
set_red_led(i, state)
_publish(gt["led_red_state"], "ON" if state else "OFF", retain=True)
info(f"Gauge {i} red LED → {'ON' if state else 'OFF'}")
try:
data = ujson.loads(payload)
except:
data = {"state": payload}
state_on = data.get("state", "ON").upper() != "OFF"
effect = data.get("effect") if state_on else None
if effect not in _BLINK_PRESETS:
effect = None
_red_effect[i] = effect
color = gauges[i]["ws2812_red"] if state_on else (0, 0, 0)
_apply_blink_or_led(i, _LED_RED, color, effect)
pub = {"state": "ON" if state_on else "OFF"}
if effect:
pub["effect"] = effect
_publish(gt["led_red_state"], ujson.dumps(pub), retain=True)
info(f"Gauge {i} red LED → {'ON' if state_on else 'OFF'}{' [' + effect + ']' if effect else ''}")
return
if topic == gt["led_green"]:
state = payload.upper() == "ON"
set_green_led(i, state)
_publish(gt["led_green_state"], "ON" if state else "OFF", retain=True)
info(f"Gauge {i} green LED → {'ON' if state else 'OFF'}")
try:
data = ujson.loads(payload)
except:
data = {"state": payload}
state_on = data.get("state", "ON").upper() != "OFF"
effect = data.get("effect") if state_on else None
if effect not in _BLINK_PRESETS:
effect = None
_green_effect[i] = effect
color = gauges[i]["ws2812_green"] if state_on else (0, 0, 0)
_apply_blink_or_led(i, _LED_GREEN, color, effect)
pub = {"state": "ON" if state_on else "OFF"}
if effect:
pub["effect"] = effect
_publish(gt["led_green_state"], ujson.dumps(pub), retain=True)
info(f"Gauge {i} green LED → {'ON' if state_on else 'OFF'}{' [' + effect + ']' if effect else ''}")
return
if topic == gt["led_bl"]:
try:
data = ujson.loads(payload)
if data.get("state", "ON").upper() == "OFF":
_bl_effect[i] = None
set_backlight_brightness(i, 0)
_publish(
gt["led_bl_state"], ujson.dumps({"state": "OFF"}), retain=True
)
_publish(gt["led_bl_state"], ujson.dumps({"state": "OFF"}), retain=True)
info(f"Gauge {i} backlight → OFF")
return
color = data.get("color", {})
@@ -498,9 +545,22 @@ def on_message(topic, payload):
brightness = backlight_brightness[i]
else:
brightness = 100
effect = data.get("effect")
if effect not in _BLINK_PRESETS:
effect = None
except Exception as e:
warn(f"Invalid backlight payload for gauge {i}: '{payload}' ({e})")
return
_bl_effect[i] = effect
if effect:
set_backlight(i, r, g, b, brightness)
backlight_color[i] = (r, g, b)
backlight_brightness[i] = brightness
backlight_on[i] = True
on_ms, off_ms = _BLINK_PRESETS[effect]
arduino_send(f"BLINK {i} {_LED_BACKLIGHT_RANGE} {on_ms} {off_ms}")
_mark_bl_dirty()
else:
set_backlight_color(i, r, g, b, brightness)
state = {
"state": "ON",
@@ -508,22 +568,48 @@ def on_message(topic, payload):
"brightness": int(brightness * 2.55),
"color": {"r": r, "g": g, "b": b},
}
if effect:
state["effect"] = effect
_publish(gt["led_bl_state"], ujson.dumps(state), retain=True)
info(f"Gauge {i} backlight → #{r:02x}{g:02x}{b:02x} @ {brightness}%")
info(f"Gauge {i} backlight → #{r:02x}{g:02x}{b:02x} @ {brightness}%{' [' + effect + ']' if effect else ''}")
return
if topic == gt["status_red"]:
state = payload.upper() == "ON"
set_status_led(i, "red", state)
_publish(gt["status_red_state"], "ON" if state else "OFF", retain=True)
info(f"Gauge {i} status red → {'ON' if state else 'OFF'}")
try:
data = ujson.loads(payload)
except:
data = {"state": payload}
state_on = data.get("state", "ON").upper() != "OFF"
effect = data.get("effect") if state_on else None
if effect not in _BLINK_PRESETS:
effect = None
_status_red_effect[i] = effect
color = gauges[i]["ws2812_red"] if state_on else (0, 0, 0)
_apply_blink_or_led(i, _LED_STATUS_RED, color, effect)
pub = {"state": "ON" if state_on else "OFF"}
if effect:
pub["effect"] = effect
_publish(gt["status_red_state"], ujson.dumps(pub), retain=True)
info(f"Gauge {i} status red → {'ON' if state_on else 'OFF'}{' [' + effect + ']' if effect else ''}")
return
if topic == gt["status_green"]:
state = payload.upper() == "ON"
set_status_led(i, "green", state)
_publish(gt["status_green_state"], "ON" if state else "OFF", retain=True)
info(f"Gauge {i} status green → {'ON' if state else 'OFF'}")
try:
data = ujson.loads(payload)
except:
data = {"state": payload}
state_on = data.get("state", "ON").upper() != "OFF"
effect = data.get("effect") if state_on else None
if effect not in _BLINK_PRESETS:
effect = None
_status_green_effect[i] = effect
color = gauges[i]["ws2812_green"] if state_on else (0, 0, 0)
_apply_blink_or_led(i, _LED_STATUS_GREEN, color, effect)
pub = {"state": "ON" if state_on else "OFF"}
if effect:
pub["effect"] = effect
_publish(gt["status_green_state"], ujson.dumps(pub), retain=True)
info(f"Gauge {i} status green → {'ON' if state_on else 'OFF'}{' [' + effect + ']' if effect else ''}")
return
if topic == T_ZERO:
@@ -658,6 +744,24 @@ def publish_discovery(client):
"""Publish all HA MQTT discovery payloads for gauges and LEDs."""
_dev_ref = _DEVICE
# Clear any previously registered switch entities (migration to light).
for i in range(num_gauges):
for old_t in [
f"homeassistant/switch/{MQTT_CLIENT_ID}_g{i}_red/config",
f"homeassistant/switch/{MQTT_CLIENT_ID}_g{i}_green/config",
f"homeassistant/switch/{MQTT_CLIENT_ID}_g{i}_status_red/config",
f"homeassistant/switch/{MQTT_CLIENT_ID}_g{i}_status_green/config",
]:
client.publish(old_t, b"", retain=True)
_indicator_light = {
"schema": "json",
"supported_color_modes": ["onoff"],
"effect": True,
"effect_list": _EFFECT_LIST,
"ret": True,
}
for i, g in enumerate(gauges):
gt = gauge_topics[i]
@@ -688,38 +792,30 @@ def publish_discovery(client):
client.publish(
gt["led_red_disc"],
ujson.dumps(
{
ujson.dumps({
"name": f"{g['name']} Red LED",
"uniq_id": f"{MQTT_CLIENT_ID}_g{i}_red",
"cmd_t": gt["led_red"],
"stat_t": gt["led_red_state"],
"pl_on": "ON",
"pl_off": "OFF",
"icon": "mdi:led-on",
"dev": _dev_ref,
"ret": True,
}
),
**_indicator_light,
}),
retain=True,
)
info(f"Discovery: gauge {i} red LED")
client.publish(
gt["led_green_disc"],
ujson.dumps(
{
ujson.dumps({
"name": f"{g['name']} Green LED",
"uniq_id": f"{MQTT_CLIENT_ID}_g{i}_green",
"cmd_t": gt["led_green"],
"stat_t": gt["led_green_state"],
"pl_on": "ON",
"pl_off": "OFF",
"icon": "mdi:led-on",
"dev": _dev_ref,
"ret": True,
}
),
**_indicator_light,
}),
retain=True,
)
info(f"Discovery: gauge {i} green LED")
@@ -738,6 +834,8 @@ def publish_discovery(client):
"stat_t": gt["led_bl_state"],
"schema": "json",
"supported_color_modes": ["rgb"],
"effect": True,
"effect_list": _EFFECT_LIST,
"icon": "mdi:led-strip",
"dev": _dev_ref,
"ret": True,
@@ -749,38 +847,30 @@ def publish_discovery(client):
client.publish(
gt["status_red_disc"],
ujson.dumps(
{
ujson.dumps({
"name": f"{g['name']} Status Red",
"uniq_id": f"{MQTT_CLIENT_ID}_g{i}_status_red",
"cmd_t": gt["status_red"],
"stat_t": gt["status_red_state"],
"pl_on": "ON",
"pl_off": "OFF",
"icon": "mdi:led-on",
"dev": _dev_ref,
"ret": True,
}
),
**_indicator_light,
}),
retain=True,
)
info(f"Discovery: gauge {i} status red")
client.publish(
gt["status_green_disc"],
ujson.dumps(
{
ujson.dumps({
"name": f"{g['name']} Status Green",
"uniq_id": f"{MQTT_CLIENT_ID}_g{i}_status_green",
"cmd_t": gt["status_green"],
"stat_t": gt["status_green_state"],
"pl_on": "ON",
"pl_off": "OFF",
"icon": "mdi:led-on",
"dev": _dev_ref,
"ret": True,
}
),
**_indicator_light,
}),
retain=True,
)
info(f"Discovery: gauge {i} status green")