diff --git a/gauge.py b/gauge.py index f289ba4..d37edb1 100644 --- a/gauge.py +++ b/gauge.py @@ -223,6 +223,15 @@ def set_status_led(gauge_idx, led_type, on): _set_led(gauge_idx, _LED_STATUS_GREEN, r, g, b) +def _apply_blink_or_led(gauge_idx, led_idx, color, effect): + """Set LED to color; if effect is a known blink preset, start blinking.""" + r, g, b = color + arduino_send(f"LED {gauge_idx} {led_idx} {r} {g} {b}") + if effect in _BLINK_PRESETS: + on_ms, off_ms = _BLINK_PRESETS[effect] + arduino_send(f"BLINK {gauge_idx} {led_idx} {on_ms} {off_ms}") + + # --------------------------------------------------------------------------- # State tracking (for MQTT state publishing) # --------------------------------------------------------------------------- @@ -240,6 +249,19 @@ _BL_SAVE_DELAY_MS = 5000 client_ref = None _mqtt_connected = False +_BLINK_PRESETS = { + "Blink Slow": (800, 800), + "Blink Fast": (150, 150), + "Blink Alert": (100, 400), +} +_EFFECT_LIST = list(_BLINK_PRESETS.keys()) + +_red_effect = [None] * num_gauges +_green_effect = [None] * num_gauges +_status_red_effect = [None] * num_gauges +_status_green_effect= [None] * num_gauges +_bl_effect = [None] * num_gauges + def _backlight_changed(gauge_idx, new_color, new_on, new_brightness): return ( @@ -295,6 +317,8 @@ def publish_backlight_states(client): "brightness": int(brightness * 2.55), "color": {"r": r, "g": g, "b": b}, } + if _bl_effect[i]: + state["effect"] = _bl_effect[i] try: client.publish(gt["led_bl_state"], ujson.dumps(state), retain=True) except Exception as e: @@ -344,15 +368,15 @@ def make_gauge_topics(prefix, gauge_id): "led_red_state": f"{prefix}/gauge{gauge_id}/led/red/state", "led_green_state": f"{prefix}/gauge{gauge_id}/led/green/state", "led_bl_state": f"{prefix}/gauge{gauge_id}/led/backlight/state", - "led_red_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_red/config", - "led_green_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_green/config", + "led_red_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_red/config", + "led_green_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_green/config", "led_bl_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_bl/config", "status_red": f"{prefix}/gauge{gauge_id}/status_led/red/set", "status_green": f"{prefix}/gauge{gauge_id}/status_led/green/set", "status_red_state": f"{prefix}/gauge{gauge_id}/status_led/red/state", "status_green_state": f"{prefix}/gauge{gauge_id}/status_led/green/state", - "status_red_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_status_red/config", - "status_green_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_status_green/config", + "status_red_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_status_red/config", + "status_green_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_status_green/config", } @@ -464,27 +488,50 @@ def on_message(topic, payload): return if topic == gt["led_red"]: - state = payload.upper() == "ON" - set_red_led(i, state) - _publish(gt["led_red_state"], "ON" if state else "OFF", retain=True) - info(f"Gauge {i} red LED → {'ON' if state else 'OFF'}") + try: + data = ujson.loads(payload) + except: + data = {"state": payload} + state_on = data.get("state", "ON").upper() != "OFF" + effect = data.get("effect") if state_on else None + if effect not in _BLINK_PRESETS: + effect = None + _red_effect[i] = effect + color = gauges[i]["ws2812_red"] if state_on else (0, 0, 0) + _apply_blink_or_led(i, _LED_RED, color, effect) + pub = {"state": "ON" if state_on else "OFF"} + if effect: + pub["effect"] = effect + _publish(gt["led_red_state"], ujson.dumps(pub), retain=True) + info(f"Gauge {i} red LED → {'ON' if state_on else 'OFF'}{' [' + effect + ']' if effect else ''}") return if topic == gt["led_green"]: - state = payload.upper() == "ON" - set_green_led(i, state) - _publish(gt["led_green_state"], "ON" if state else "OFF", retain=True) - info(f"Gauge {i} green LED → {'ON' if state else 'OFF'}") + try: + data = ujson.loads(payload) + except: + data = {"state": payload} + state_on = data.get("state", "ON").upper() != "OFF" + effect = data.get("effect") if state_on else None + if effect not in _BLINK_PRESETS: + effect = None + _green_effect[i] = effect + color = gauges[i]["ws2812_green"] if state_on else (0, 0, 0) + _apply_blink_or_led(i, _LED_GREEN, color, effect) + pub = {"state": "ON" if state_on else "OFF"} + if effect: + pub["effect"] = effect + _publish(gt["led_green_state"], ujson.dumps(pub), retain=True) + info(f"Gauge {i} green LED → {'ON' if state_on else 'OFF'}{' [' + effect + ']' if effect else ''}") return if topic == gt["led_bl"]: try: data = ujson.loads(payload) if data.get("state", "ON").upper() == "OFF": + _bl_effect[i] = None set_backlight_brightness(i, 0) - _publish( - gt["led_bl_state"], ujson.dumps({"state": "OFF"}), retain=True - ) + _publish(gt["led_bl_state"], ujson.dumps({"state": "OFF"}), retain=True) info(f"Gauge {i} backlight → OFF") return color = data.get("color", {}) @@ -498,32 +545,71 @@ def on_message(topic, payload): brightness = backlight_brightness[i] else: brightness = 100 + effect = data.get("effect") + if effect not in _BLINK_PRESETS: + effect = None except Exception as e: warn(f"Invalid backlight payload for gauge {i}: '{payload}' ({e})") return - set_backlight_color(i, r, g, b, brightness) + _bl_effect[i] = effect + if effect: + set_backlight(i, r, g, b, brightness) + backlight_color[i] = (r, g, b) + backlight_brightness[i] = brightness + backlight_on[i] = True + on_ms, off_ms = _BLINK_PRESETS[effect] + arduino_send(f"BLINK {i} {_LED_BACKLIGHT_RANGE} {on_ms} {off_ms}") + _mark_bl_dirty() + else: + set_backlight_color(i, r, g, b, brightness) state = { "state": "ON", "color_mode": "rgb", "brightness": int(brightness * 2.55), "color": {"r": r, "g": g, "b": b}, } + if effect: + state["effect"] = effect _publish(gt["led_bl_state"], ujson.dumps(state), retain=True) - info(f"Gauge {i} backlight → #{r:02x}{g:02x}{b:02x} @ {brightness}%") + info(f"Gauge {i} backlight → #{r:02x}{g:02x}{b:02x} @ {brightness}%{' [' + effect + ']' if effect else ''}") return if topic == gt["status_red"]: - state = payload.upper() == "ON" - set_status_led(i, "red", state) - _publish(gt["status_red_state"], "ON" if state else "OFF", retain=True) - info(f"Gauge {i} status red → {'ON' if state else 'OFF'}") + try: + data = ujson.loads(payload) + except: + data = {"state": payload} + state_on = data.get("state", "ON").upper() != "OFF" + effect = data.get("effect") if state_on else None + if effect not in _BLINK_PRESETS: + effect = None + _status_red_effect[i] = effect + color = gauges[i]["ws2812_red"] if state_on else (0, 0, 0) + _apply_blink_or_led(i, _LED_STATUS_RED, color, effect) + pub = {"state": "ON" if state_on else "OFF"} + if effect: + pub["effect"] = effect + _publish(gt["status_red_state"], ujson.dumps(pub), retain=True) + info(f"Gauge {i} status red → {'ON' if state_on else 'OFF'}{' [' + effect + ']' if effect else ''}") return if topic == gt["status_green"]: - state = payload.upper() == "ON" - set_status_led(i, "green", state) - _publish(gt["status_green_state"], "ON" if state else "OFF", retain=True) - info(f"Gauge {i} status green → {'ON' if state else 'OFF'}") + try: + data = ujson.loads(payload) + except: + data = {"state": payload} + state_on = data.get("state", "ON").upper() != "OFF" + effect = data.get("effect") if state_on else None + if effect not in _BLINK_PRESETS: + effect = None + _status_green_effect[i] = effect + color = gauges[i]["ws2812_green"] if state_on else (0, 0, 0) + _apply_blink_or_led(i, _LED_STATUS_GREEN, color, effect) + pub = {"state": "ON" if state_on else "OFF"} + if effect: + pub["effect"] = effect + _publish(gt["status_green_state"], ujson.dumps(pub), retain=True) + info(f"Gauge {i} status green → {'ON' if state_on else 'OFF'}{' [' + effect + ']' if effect else ''}") return if topic == T_ZERO: @@ -658,6 +744,24 @@ def publish_discovery(client): """Publish all HA MQTT discovery payloads for gauges and LEDs.""" _dev_ref = _DEVICE + # Clear any previously registered switch entities (migration to light). + for i in range(num_gauges): + for old_t in [ + f"homeassistant/switch/{MQTT_CLIENT_ID}_g{i}_red/config", + f"homeassistant/switch/{MQTT_CLIENT_ID}_g{i}_green/config", + f"homeassistant/switch/{MQTT_CLIENT_ID}_g{i}_status_red/config", + f"homeassistant/switch/{MQTT_CLIENT_ID}_g{i}_status_green/config", + ]: + client.publish(old_t, b"", retain=True) + + _indicator_light = { + "schema": "json", + "supported_color_modes": ["onoff"], + "effect": True, + "effect_list": _EFFECT_LIST, + "ret": True, + } + for i, g in enumerate(gauges): gt = gauge_topics[i] @@ -688,38 +792,30 @@ def publish_discovery(client): client.publish( gt["led_red_disc"], - ujson.dumps( - { - "name": f"{g['name']} Red LED", - "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_red", - "cmd_t": gt["led_red"], - "stat_t": gt["led_red_state"], - "pl_on": "ON", - "pl_off": "OFF", - "icon": "mdi:led-on", - "dev": _dev_ref, - "ret": True, - } - ), + ujson.dumps({ + "name": f"{g['name']} Red LED", + "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_red", + "cmd_t": gt["led_red"], + "stat_t": gt["led_red_state"], + "icon": "mdi:led-on", + "dev": _dev_ref, + **_indicator_light, + }), retain=True, ) info(f"Discovery: gauge {i} red LED") client.publish( gt["led_green_disc"], - ujson.dumps( - { - "name": f"{g['name']} Green LED", - "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_green", - "cmd_t": gt["led_green"], - "stat_t": gt["led_green_state"], - "pl_on": "ON", - "pl_off": "OFF", - "icon": "mdi:led-on", - "dev": _dev_ref, - "ret": True, - } - ), + ujson.dumps({ + "name": f"{g['name']} Green LED", + "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_green", + "cmd_t": gt["led_green"], + "stat_t": gt["led_green_state"], + "icon": "mdi:led-on", + "dev": _dev_ref, + **_indicator_light, + }), retain=True, ) info(f"Discovery: gauge {i} green LED") @@ -738,6 +834,8 @@ def publish_discovery(client): "stat_t": gt["led_bl_state"], "schema": "json", "supported_color_modes": ["rgb"], + "effect": True, + "effect_list": _EFFECT_LIST, "icon": "mdi:led-strip", "dev": _dev_ref, "ret": True, @@ -749,38 +847,30 @@ def publish_discovery(client): client.publish( gt["status_red_disc"], - ujson.dumps( - { - "name": f"{g['name']} Status Red", - "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_status_red", - "cmd_t": gt["status_red"], - "stat_t": gt["status_red_state"], - "pl_on": "ON", - "pl_off": "OFF", - "icon": "mdi:led-on", - "dev": _dev_ref, - "ret": True, - } - ), + ujson.dumps({ + "name": f"{g['name']} Status Red", + "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_status_red", + "cmd_t": gt["status_red"], + "stat_t": gt["status_red_state"], + "icon": "mdi:led-on", + "dev": _dev_ref, + **_indicator_light, + }), retain=True, ) info(f"Discovery: gauge {i} status red") client.publish( gt["status_green_disc"], - ujson.dumps( - { - "name": f"{g['name']} Status Green", - "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_status_green", - "cmd_t": gt["status_green"], - "stat_t": gt["status_green_state"], - "pl_on": "ON", - "pl_off": "OFF", - "icon": "mdi:led-on", - "dev": _dev_ref, - "ret": True, - } - ), + ujson.dumps({ + "name": f"{g['name']} Status Green", + "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_status_green", + "cmd_t": gt["status_green"], + "stat_t": gt["status_green_state"], + "icon": "mdi:led-on", + "dev": _dev_ref, + **_indicator_light, + }), retain=True, ) info(f"Discovery: gauge {i} status green")