""" gauge.py — MQTT-based gauge controller for ESP32 / MicroPython Bridges MQTT commands to an Arduino running the gauge controller firmware over UART. The Arduino handles all stepper motor motion and LED output. LED layout per gauge (0-based indices within the Arduino gauge LED segment): 0-2 backlight (3 LEDs) 3 red indicator 4 green indicator 5 status red 6 status green Serial commands follow the Arduino gauge controller protocol (see CLAUDE.md). Additional config.json fields: arduino_uart — UART peripheral number (default 1) arduino_tx_pin — ESP32 GPIO for TX (default 17) arduino_rx_pin — ESP32 GPIO for RX (default 16) arduino_baud — baud rate (default 115200) gauge.max_steps — full-scale step count on the Arduino (default 4000) """ import network import utime import ujson import gc from umqtt.robust import MQTTClient from machine import UART # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- def _ts(): ms = utime.ticks_ms() return f"{(ms // 3600000) % 24:02d}:{(ms // 60000) % 60:02d}:{(ms // 1000) % 60:02d}.{ms % 1000:03d}" def log(level, msg): print(f"[{_ts()}] {level:5s} {msg}") _DEBUG = False def info(msg): if _DEBUG: log("INFO", msg) def warn(msg): log("WARN", msg) def log_err(msg): log("ERROR", msg) # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- def _load_config(): try: with open("/config.json") as f: cfg = ujson.load(f) info("Config loaded from /config.json") return cfg except OSError: log_err("config.json not found — cannot continue") raise except Exception as e: log_err(f"config.json parse error: {e} — cannot continue") raise _cfg = _load_config() DEBUG = _cfg.get("debug", False) _DEBUG = DEBUG WIFI_SSID = _cfg["wifi_ssid"] WIFI_PASSWORD = _cfg["wifi_password"] MQTT_BROKER = _cfg["mqtt_broker"] MQTT_PORT = int(_cfg.get("mqtt_port", 1883)) MQTT_USER = _cfg["mqtt_user"] MQTT_PASSWORD = _cfg["mqtt_password"] MQTT_CLIENT_ID = _cfg["mqtt_client_id"] MQTT_PREFIX = _cfg["mqtt_prefix"] HEARTBEAT_MS = int(_cfg.get("heartbeat_ms", 10000)) REZERO_INTERVAL_MS = int(_cfg.get("rezero_interval_ms", 3600000)) device_cfg = _cfg.get("device", {}) DEVICE_NAME = device_cfg.get("name", _cfg.get("device_name", "Selsyn Multi")) DEVICE_MODEL = device_cfg.get( "model", _cfg.get("device_model", "Chernobyl Selsyn-inspired gauge") ) DEVICE_MFR = device_cfg.get( "manufacturer", _cfg.get("device_manufacturer", "AdeBaumann") ) DEVICE_AREA = device_cfg.get("area", _cfg.get("device_area", "Control Panels")) gauges = [] if "gauges" in _cfg: for i, g in enumerate(_cfg["gauges"]): led_cfg = g.get("leds", {}) gauges.append( { "id": i, "name": g.get("name", f"Gauge {i + 1}"), "min": float(g.get("min", 0)), "max": float(g.get("max", 100)), "max_steps": int(g.get("max_steps", 4000)), "speed": float(g.get("speed", 5000.0)), "acceleration": float(g.get("acceleration", 6000.0)), "entity_name": g.get("entity_name", f"Gauge {i + 1}"), "unit": g.get("unit", ""), "ws2812_red": tuple(led_cfg.get("ws2812_red", [255, 0, 0])), "ws2812_green": tuple(led_cfg.get("ws2812_green", [0, 255, 0])), } ) else: gauges.append( { "id": 0, "name": "Gauge 1", "min": float(_cfg.get("gauge_min", 0)), "max": float(_cfg.get("gauge_max", 7300)), "max_steps": int(_cfg.get("gauge_max_steps", 4000)), "speed": float(_cfg.get("gauge_speed", 5000.0)), "acceleration": float(_cfg.get("gauge_acceleration", 6000.0)), "entity_name": _cfg.get("gauge_entity_name", "Selsyn 1 Power"), "unit": _cfg.get("gauge_unit", "W"), "ws2812_red": tuple(_cfg.get("ws2812_red", [255, 0, 0])), "ws2812_green": tuple(_cfg.get("ws2812_green", [0, 255, 0])), } ) num_gauges = len(gauges) # --------------------------------------------------------------------------- # Arduino UART # --------------------------------------------------------------------------- ARDUINO_UART_ID = int(_cfg.get("arduino_uart", 1)) ARDUINO_TX_PIN = int(_cfg.get("arduino_tx_pin", 17)) ARDUINO_RX_PIN = int(_cfg.get("arduino_rx_pin", 16)) ARDUINO_BAUD = int(_cfg.get("arduino_baud", 115200)) _arduino = UART(ARDUINO_UART_ID, baudrate=ARDUINO_BAUD, tx=ARDUINO_TX_PIN, rx=ARDUINO_RX_PIN, timeout=10) def arduino_send(cmd): """Send a newline-terminated command to the Arduino.""" _arduino.write((cmd + "\n").encode()) info(f"Arduino → {cmd}") def arduino_recv(): """Print any lines waiting in the Arduino RX buffer.""" while _arduino.any(): line = _arduino.readline() if line: print(f"[{_ts()}] ARDU {line.decode().strip()}") # --------------------------------------------------------------------------- # Arduino command helpers # --------------------------------------------------------------------------- # LED indices within each gauge's segment (0-based, matching CLAUDE.md layout) _LED_BACKLIGHT_RANGE = "0-2" # LEDs 1-3: backlight _LED_RED = 3 # LED 4: red indicator _LED_GREEN = 4 # LED 5: green indicator _LED_STATUS_RED = 5 # LED 6: status red _LED_STATUS_GREEN = 6 # LED 7: status green def _val_to_steps(gauge_idx, value): """Map a physical value to an Arduino absolute step position.""" g = gauges[gauge_idx] frac = (value - g["min"]) / (g["max"] - g["min"]) return int(max(0, min(g["max_steps"], frac * g["max_steps"]))) def gauge_set(gauge_idx, value): arduino_send(f"SET {gauge_idx} {_val_to_steps(gauge_idx, value)}") def gauge_home(gauge_idx): arduino_send(f"HOME {gauge_idx}") def gauge_zero(gauge_idx): arduino_send(f"ZERO {gauge_idx}") def gauge_set_speed(gauge_idx, speed): arduino_send(f"SPEED {gauge_idx} {speed}") def gauge_set_acceleration(gauge_idx, acceleration): arduino_send(f"ACCEL {gauge_idx} {acceleration}") def _set_led(gauge_idx, idx, r, g, b): arduino_send(f"LED {gauge_idx} {idx} {r} {g} {b}") def set_backlight(gauge_idx, r, g, b, brightness): """Send backlight colour+brightness to the 3 backlight LEDs (0-2).""" scale = brightness / 100 _set_led(gauge_idx, _LED_BACKLIGHT_RANGE, int(r * scale), int(g * scale), int(b * scale)) def set_red_led(gauge_idx, on): r, g, b = gauges[gauge_idx]["ws2812_red"] if on else (0, 0, 0) _set_led(gauge_idx, _LED_RED, r, g, b) def set_green_led(gauge_idx, on): r, g, b = gauges[gauge_idx]["ws2812_green"] if on else (0, 0, 0) _set_led(gauge_idx, _LED_GREEN, r, g, b) def set_status_led(gauge_idx, led_type, on): if led_type == "red": r, g, b = gauges[gauge_idx]["ws2812_red"] if on else (0, 0, 0) _set_led(gauge_idx, _LED_STATUS_RED, r, g, b) elif led_type == "green": r, g, b = gauges[gauge_idx]["ws2812_green"] if on else (0, 0, 0) _set_led(gauge_idx, _LED_STATUS_GREEN, r, g, b) def _send_effect(gauge_idx, led_ref, color, effect): """Send a single Arduino command for the given effect (or plain LED if none). Always embeds color in the command — no preceding LED command needed.""" r, g, b = color if effect not in _EFFECTS: arduino_send(f"LED {gauge_idx} {led_ref} {r} {g} {b}") return p = _EFFECTS[effect] if p[0] == "blink": arduino_send(f"BLINK {gauge_idx} {led_ref} {p[1]} {p[2]} {r} {g} {b}") elif p[0] == "breathe": arduino_send(f"BREATHE {gauge_idx} {led_ref} {p[1]} {r} {g} {b}") elif p[0] == "dflash": arduino_send(f"DFLASH {gauge_idx} {led_ref} {r} {g} {b}") def _apply_blink_or_led(gauge_idx, led_idx, color, effect): """Set LED to color, optionally starting an effect. Color is always embedded in the command — avoids FastLED.show() race.""" _send_effect(gauge_idx, led_idx, color, effect) # --------------------------------------------------------------------------- # State tracking (for MQTT state publishing) # --------------------------------------------------------------------------- gauge_targets = [g["min"] for g in gauges] gauge_last_rezero = [utime.ticks_ms() for _ in gauges] gauge_speeds = [g["speed"] for g in gauges] gauge_accelerations = [g["acceleration"] for g in gauges] backlight_color = [(0, 0, 0) for _ in range(num_gauges)] backlight_brightness = [100 for _ in range(num_gauges)] backlight_on = [False for _ in range(num_gauges)] _bl_dirty_since = None _BL_SAVE_DELAY_MS = 5000 client_ref = None _mqtt_connected = False _EFFECTS = { "Blink Slow": ("blink", 800, 800), "Blink Fast": ("blink", 150, 150), "Blink Alert": ("blink", 100, 400), "Breathe Slow": ("breathe", 3000), "Breathe Fast": ("breathe", 1200), "Double Flash": ("dflash",), } _EFFECT_LIST = list(_EFFECTS.keys()) _red_effect = [None] * num_gauges _green_effect = [None] * num_gauges _status_red_effect = [None] * num_gauges _status_green_effect= [None] * num_gauges _bl_effect = [None] * num_gauges vfd_text = "" vfd_decimal_point = False vfd_alarm = False def _build_vfd_command(): suffix = "" if vfd_decimal_point: suffix += "." if vfd_alarm: suffix += "!" if vfd_text: return f"VFD {vfd_text}{suffix}" if suffix: return f"VFD 0{suffix}" return "VFD" def send_vfd_state(): arduino_send(_build_vfd_command()) def publish_vfd_state(client): client.publish(vfd_topics["state"], vfd_text, retain=True) client.publish(vfd_topics["decimal_point_state"], b"ON" if vfd_decimal_point else b"OFF", retain=True) client.publish(vfd_topics["alarm_state"], b"ON" if vfd_alarm else b"OFF", retain=True) def _backlight_changed(gauge_idx, new_color, new_on, new_brightness): return ( new_color != backlight_color[gauge_idx] or new_on != backlight_on[gauge_idx] or (new_on and new_brightness != backlight_brightness[gauge_idx]) ) def _mark_bl_dirty(): global _bl_dirty_since _bl_dirty_since = utime.ticks_ms() def set_backlight_color(gauge_idx, r, g, b, brightness=None): global backlight_color, backlight_brightness, backlight_on if brightness is None: brightness = backlight_brightness[gauge_idx] new_on = brightness > 0 if not _backlight_changed(gauge_idx, (r, g, b), new_on, brightness): return backlight_color[gauge_idx] = (r, g, b) if brightness > 0: backlight_brightness[gauge_idx] = brightness backlight_on[gauge_idx] = new_on set_backlight(gauge_idx, r, g, b, brightness) _mark_bl_dirty() def set_backlight_brightness(gauge_idx, brightness): global backlight_brightness, backlight_on clamped = max(0, min(100, brightness)) new_on = clamped > 0 if not _backlight_changed(gauge_idx, backlight_color[gauge_idx], new_on, clamped): return if clamped > 0: backlight_brightness[gauge_idx] = clamped backlight_on[gauge_idx] = new_on r, g, b = backlight_color[gauge_idx] set_backlight(gauge_idx, r, g, b, clamped) _mark_bl_dirty() def publish_backlight_states(client): """Publish current backlight state for all gauges as retained MQTT messages.""" for i in range(num_gauges): gt = gauge_topics[i] r, g, b = backlight_color[i] brightness = backlight_brightness[i] state = { "state": "ON" if backlight_on[i] else "OFF", "color_mode": "rgb", "brightness": int(brightness * 2.55), "color": {"r": r, "g": g, "b": b}, } if _bl_effect[i]: state["effect"] = _bl_effect[i] try: client.publish(gt["led_bl_state"], ujson.dumps(state), retain=True) except Exception as e: log_err(f"Backlight state publish failed for gauge {i}: {e}") def restore_backlight_state(gauge_idx, payload): """Restore retained backlight state without republishing it back to MQTT.""" global backlight_color, backlight_brightness, backlight_on, _bl_effect try: data = ujson.loads(payload) except Exception as e: warn(f"Invalid retained backlight state for gauge {gauge_idx}: '{payload}' ({e})") return state_on = data.get("state", "OFF").upper() != "OFF" effect = data.get("effect") if effect not in _EFFECTS: effect = None if not state_on: _bl_effect[gauge_idx] = None backlight_on[gauge_idx] = False set_backlight_brightness(gauge_idx, 0) return color = data.get("color", {}) r = max(0, min(255, int(color.get("r", backlight_color[gauge_idx][0])))) g = max(0, min(255, int(color.get("g", backlight_color[gauge_idx][1])))) b = max(0, min(255, int(color.get("b", backlight_color[gauge_idx][2])))) raw_br = data.get("brightness", None) if raw_br is not None: brightness = max(0, min(100, round(int(raw_br) / 2.55))) elif backlight_brightness[gauge_idx] > 0: brightness = backlight_brightness[gauge_idx] else: brightness = 100 _bl_effect[gauge_idx] = effect if effect: scale = brightness / 100 rs = int(r * scale) gs = int(g * scale) bs_ = int(b * scale) _send_effect(gauge_idx, _LED_BACKLIGHT_RANGE, (rs, gs, bs_), effect) backlight_color[gauge_idx] = (r, g, b) backlight_brightness[gauge_idx] = brightness backlight_on[gauge_idx] = True else: set_backlight_color(gauge_idx, r, g, b, brightness) def _flush_backlight_state(): global _bl_dirty_since if _bl_dirty_since is None: return if utime.ticks_diff(utime.ticks_ms(), _bl_dirty_since) < _BL_SAVE_DELAY_MS: return if client_ref is None: return publish_backlight_states(client_ref) _bl_dirty_since = None info("Backlight state flushed to MQTT") def _publish(topic, payload, retain=False): """Safely publish MQTT message, returning True on success.""" if client_ref is None: return False try: client_ref.publish(topic, payload, retain=retain) return True except Exception as e: log_err(f"MQTT publish failed: {e}") return False # --------------------------------------------------------------------------- # Topics (per-gauge) # --------------------------------------------------------------------------- def make_gauge_topics(prefix, gauge_id): return { "set": f"{prefix}/gauge{gauge_id}/set", "state": f"{prefix}/gauge{gauge_id}/state", "status": f"{prefix}/gauge{gauge_id}/status", "zero": f"{prefix}/gauge{gauge_id}/zero", "disc": f"homeassistant/number/{MQTT_CLIENT_ID}_g{gauge_id}/config", "speed": f"{prefix}/gauge{gauge_id}/speed/set", "speed_state": f"{prefix}/gauge{gauge_id}/speed/state", "speed_disc": f"homeassistant/number/{MQTT_CLIENT_ID}_g{gauge_id}_speed/config", "acceleration": f"{prefix}/gauge{gauge_id}/acceleration/set", "acceleration_state": f"{prefix}/gauge{gauge_id}/acceleration/state", "acceleration_disc": f"homeassistant/number/{MQTT_CLIENT_ID}_g{gauge_id}_acceleration/config", "led_red": f"{prefix}/gauge{gauge_id}/led/red/set", "led_green": f"{prefix}/gauge{gauge_id}/led/green/set", "led_bl": f"{prefix}/gauge{gauge_id}/led/backlight/set", "led_red_state": f"{prefix}/gauge{gauge_id}/led/red/state", "led_green_state": f"{prefix}/gauge{gauge_id}/led/green/state", "led_bl_state": f"{prefix}/gauge{gauge_id}/led/backlight/state", "led_red_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_red/config", "led_green_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_green/config", "led_bl_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_bl/config", "status_red": f"{prefix}/gauge{gauge_id}/status_led/red/set", "status_green": f"{prefix}/gauge{gauge_id}/status_led/green/set", "status_red_state": f"{prefix}/gauge{gauge_id}/status_led/red/state", "status_green_state": f"{prefix}/gauge{gauge_id}/status_led/green/state", "status_red_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_status_red/config", "status_green_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_status_green/config", } gauge_topics = [make_gauge_topics(MQTT_PREFIX, g["id"]) for g in gauges] vfd_topics = { "set": f"{MQTT_PREFIX}/vfd/set", "state": f"{MQTT_PREFIX}/vfd/state", "disc": f"homeassistant/text/{MQTT_CLIENT_ID}_vfd/config", "decimal_point": f"{MQTT_PREFIX}/vfd/decimal_point/set", "decimal_point_state": f"{MQTT_PREFIX}/vfd/decimal_point/state", "decimal_point_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_vfd_decimal_point/config", "alarm": f"{MQTT_PREFIX}/vfd/alarm/set", "alarm_state": f"{MQTT_PREFIX}/vfd/alarm/state", "alarm_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_vfd_alarm/config", } T_SET = f"{MQTT_PREFIX}/set" T_ZERO = f"{MQTT_PREFIX}/zero" _DEVICE = { "identifiers": [MQTT_CLIENT_ID], "name": DEVICE_NAME, "model": DEVICE_MODEL, "manufacturer": DEVICE_MFR, "suggested_area": DEVICE_AREA, } # --------------------------------------------------------------------------- # WiFi # --------------------------------------------------------------------------- _wifi_check_interval_ms = 30000 _last_wifi_check = 0 _wifi_sta = None _WIFI_CONNECT_ATTEMPTS = 3 def _reset_wifi_interface(): global _wifi_sta _wifi_sta = network.WLAN(network.STA_IF) try: _wifi_sta.disconnect() except Exception: pass try: _wifi_sta.active(False) utime.sleep_ms(500) except Exception: pass _wifi_sta.active(True) utime.sleep_ms(500) def connect_wifi(ssid, password, timeout_s=15, force_reconnect=False): global _wifi_sta _wifi_sta = network.WLAN(network.STA_IF) if _wifi_sta.isconnected() and not force_reconnect: ip, mask, gw, dns = _wifi_sta.ifconfig() info("WiFi already connected") info(f" IP:{ip} mask:{mask} gw:{gw} dns:{dns}") utime.sleep_ms(250) return ip last_error = None for attempt in range(_WIFI_CONNECT_ATTEMPTS): info(f"WiFi connecting to '{ssid}' (attempt {attempt + 1}/{_WIFI_CONNECT_ATTEMPTS}) ...") _reset_wifi_interface() try: _wifi_sta.connect(ssid, password) deadline = utime.time() + timeout_s while not _wifi_sta.isconnected(): if utime.time() > deadline: raise OSError("WiFi connect timeout") utime.sleep_ms(250) ip, mask, gw, dns = _wifi_sta.ifconfig() mac = ":".join(f"{b:02x}" for b in _wifi_sta.config("mac")) info("WiFi connected!") info(f" SSID : {ssid}") info(f" MAC : {mac}") info(f" IP : {ip} mask:{mask} gw:{gw} dns:{dns}") utime.sleep_ms(500) return ip except Exception as e: last_error = e log_err(f"WiFi connect attempt {attempt + 1} failed: {e}") utime.sleep_ms(1000) raise last_error def check_wifi(): global _wifi_sta, _last_wifi_check now = utime.ticks_ms() if utime.ticks_diff(now, _last_wifi_check) < _wifi_check_interval_ms: return _last_wifi_check = now if _wifi_sta is None: _wifi_sta = network.WLAN(network.STA_IF) if _wifi_sta.isconnected(): return log_err("WiFi lost connection — attempting reconnect...") try: ip = connect_wifi(WIFI_SSID, WIFI_PASSWORD, timeout_s=15, force_reconnect=True) info(f"WiFi reconnected! IP:{ip}") except Exception as e: log_err(f"WiFi reconnect failed: {e}") # --------------------------------------------------------------------------- # MQTT callbacks # --------------------------------------------------------------------------- def on_message(topic, payload): global vfd_text, vfd_decimal_point, vfd_alarm if client_ref is None: return topic = topic.decode() payload = payload.decode().strip() info(f"MQTT rx {topic} {payload}") if topic == vfd_topics["set"]: vfd_text = payload.upper() send_vfd_state() publish_vfd_state(client_ref) info(f"VFD text -> {vfd_text}") return if topic == vfd_topics["decimal_point"]: vfd_decimal_point = payload.upper() == "ON" send_vfd_state() publish_vfd_state(client_ref) info(f"VFD decimal point -> {'ON' if vfd_decimal_point else 'OFF'}") return if topic == vfd_topics["alarm"]: vfd_alarm = payload.upper() == "ON" send_vfd_state() publish_vfd_state(client_ref) info(f"VFD alarm -> {'ON' if vfd_alarm else 'OFF'}") return for i, gt in enumerate(gauge_topics): if topic == gt["led_bl_state"]: restore_backlight_state(i, payload) info(f"Gauge {i} backlight state restored") return if topic == gt["zero"]: info(f"Home command received for gauge {i}") gauge_home(i) gauge_last_rezero[i] = utime.ticks_ms() return if topic == gt["set"]: g = gauges[i] try: val = max(g["min"], min(g["max"], float(payload))) gauge_targets[i] = val gauge_set(i, val) info(f"Gauge {i} target → {val:.1f}") except ValueError: warn(f"Invalid set value for gauge {i}: '{payload}'") return if topic == gt["speed"]: try: speed = float(payload) if speed <= 0.0: raise ValueError gauge_speeds[i] = speed gauge_set_speed(i, speed) _publish(gt["speed_state"], str(speed), retain=True) info(f"Gauge {i} speed -> {speed}") except ValueError: warn(f"Invalid speed for gauge {i}: '{payload}'") return if topic == gt["acceleration"]: try: acceleration = float(payload) if acceleration <= 0.0: raise ValueError gauge_accelerations[i] = acceleration gauge_set_acceleration(i, acceleration) _publish(gt["acceleration_state"], str(acceleration), retain=True) info(f"Gauge {i} acceleration -> {acceleration}") except ValueError: warn(f"Invalid acceleration for gauge {i}: '{payload}'") return if topic == gt["led_red"]: try: data = ujson.loads(payload) except: data = {"state": payload} state_on = data.get("state", "ON").upper() != "OFF" effect = data.get("effect") if state_on else None if effect not in _EFFECTS: effect = None _red_effect[i] = effect color = gauges[i]["ws2812_red"] if state_on else (0, 0, 0) _apply_blink_or_led(i, _LED_RED, color, effect) pub = {"state": "ON" if state_on else "OFF"} if effect: pub["effect"] = effect _publish(gt["led_red_state"], ujson.dumps(pub), retain=True) info(f"Gauge {i} red LED → {'ON' if state_on else 'OFF'}{' [' + effect + ']' if effect else ''}") return if topic == gt["led_green"]: try: data = ujson.loads(payload) except: data = {"state": payload} state_on = data.get("state", "ON").upper() != "OFF" effect = data.get("effect") if state_on else None if effect not in _EFFECTS: effect = None _green_effect[i] = effect color = gauges[i]["ws2812_green"] if state_on else (0, 0, 0) _apply_blink_or_led(i, _LED_GREEN, color, effect) pub = {"state": "ON" if state_on else "OFF"} if effect: pub["effect"] = effect _publish(gt["led_green_state"], ujson.dumps(pub), retain=True) info(f"Gauge {i} green LED → {'ON' if state_on else 'OFF'}{' [' + effect + ']' if effect else ''}") return if topic == gt["led_bl"]: try: data = ujson.loads(payload) if data.get("state", "ON").upper() == "OFF": _bl_effect[i] = None set_backlight_brightness(i, 0) _publish(gt["led_bl_state"], ujson.dumps({"state": "OFF"}), retain=True) info(f"Gauge {i} backlight → OFF") return color = data.get("color", {}) r = max(0, min(255, int(color.get("r", backlight_color[i][0])))) g = max(0, min(255, int(color.get("g", backlight_color[i][1])))) b = max(0, min(255, int(color.get("b", backlight_color[i][2])))) raw_br = data.get("brightness", None) if raw_br is not None: brightness = max(0, min(100, round(int(raw_br) / 2.55))) elif backlight_brightness[i] > 0: brightness = backlight_brightness[i] else: brightness = 100 effect = data.get("effect") if effect not in _EFFECTS: effect = None except Exception as e: warn(f"Invalid backlight payload for gauge {i}: '{payload}' ({e})") return _bl_effect[i] = effect if effect: scale = brightness / 100 rs = int(r * scale); gs = int(g * scale); bs_ = int(b * scale) _send_effect(i, _LED_BACKLIGHT_RANGE, (rs, gs, bs_), effect) backlight_color[i] = (r, g, b) backlight_brightness[i] = brightness backlight_on[i] = True _mark_bl_dirty() else: set_backlight_color(i, r, g, b, brightness) state = { "state": "ON", "color_mode": "rgb", "brightness": int(brightness * 2.55), "color": {"r": r, "g": g, "b": b}, } if effect: state["effect"] = effect _publish(gt["led_bl_state"], ujson.dumps(state), retain=True) info(f"Gauge {i} backlight → #{r:02x}{g:02x}{b:02x} @ {brightness}%{' [' + effect + ']' if effect else ''}") return if topic == gt["status_red"]: try: data = ujson.loads(payload) except: data = {"state": payload} state_on = data.get("state", "ON").upper() != "OFF" effect = data.get("effect") if state_on else None if effect not in _EFFECTS: effect = None _status_red_effect[i] = effect color = gauges[i]["ws2812_red"] if state_on else (0, 0, 0) _apply_blink_or_led(i, _LED_STATUS_RED, color, effect) pub = {"state": "ON" if state_on else "OFF"} if effect: pub["effect"] = effect _publish(gt["status_red_state"], ujson.dumps(pub), retain=True) info(f"Gauge {i} status red → {'ON' if state_on else 'OFF'}{' [' + effect + ']' if effect else ''}") return if topic == gt["status_green"]: try: data = ujson.loads(payload) except: data = {"state": payload} state_on = data.get("state", "ON").upper() != "OFF" effect = data.get("effect") if state_on else None if effect not in _EFFECTS: effect = None _status_green_effect[i] = effect color = gauges[i]["ws2812_green"] if state_on else (0, 0, 0) _apply_blink_or_led(i, _LED_STATUS_GREEN, color, effect) pub = {"state": "ON" if state_on else "OFF"} if effect: pub["effect"] = effect _publish(gt["status_green_state"], ujson.dumps(pub), retain=True) info(f"Gauge {i} status green → {'ON' if state_on else 'OFF'}{' [' + effect + ']' if effect else ''}") return if topic == T_ZERO: for i in range(num_gauges): gauge_home(i) gauge_last_rezero[i] = utime.ticks_ms() info("All gauges homed") return if topic == T_SET: try: data = ujson.loads(payload) if isinstance(data, dict): for i, val in enumerate(data.values()): if i < num_gauges: g = gauges[i] cval = max(g["min"], min(g["max"], float(val))) gauge_targets[i] = cval gauge_set(i, cval) info(f"Gauge {i} target → {cval:.1f}") else: val = float(payload) for i in range(num_gauges): g = gauges[i] cval = max(g["min"], min(g["max"], val)) gauge_targets[i] = cval gauge_set(i, cval) info(f"All gauges target → {val:.1f}") except Exception: try: val = float(payload) for i in range(num_gauges): g = gauges[i] cval = max(g["min"], min(g["max"], val)) gauge_targets[i] = cval gauge_set(i, cval) info(f"All gauges target → {val:.1f}") except: warn(f"Invalid set value: '{payload}'") return # --------------------------------------------------------------------------- # MQTT connect + discovery # --------------------------------------------------------------------------- def _subscribe_all(c): c.subscribe(f"{MQTT_PREFIX}/set") c.subscribe(f"{MQTT_PREFIX}/zero") c.subscribe(vfd_topics["set"]) c.subscribe(vfd_topics["decimal_point"]) c.subscribe(vfd_topics["alarm"]) for i in range(num_gauges): prefix = f"{MQTT_PREFIX}/gauge{i}" c.subscribe(f"{prefix}/set") c.subscribe(f"{prefix}/zero") c.subscribe(f"{prefix}/speed/set") c.subscribe(f"{prefix}/acceleration/set") c.subscribe(f"{prefix}/led/red/set") c.subscribe(f"{prefix}/led/green/set") c.subscribe(f"{prefix}/led/backlight/set") c.subscribe(f"{prefix}/led/backlight/state") c.subscribe(f"{prefix}/status_led/red/set") c.subscribe(f"{prefix}/status_led/green/set") def connect_mqtt(): global client_ref, _mqtt_connected info(f"Connecting to MQTT broker {MQTT_BROKER}:{MQTT_PORT} ...") last_error = None for attempt in range(3): try: if client_ref is not None: try: client_ref.disconnect() except Exception: pass client = MQTTClient( client_id=MQTT_CLIENT_ID, server=MQTT_BROKER, port=MQTT_PORT, user=MQTT_USER, password=MQTT_PASSWORD, keepalive=30, ) client.set_callback(on_message) client.connect() client_ref = client _mqtt_connected = True info(f"MQTT connected client_id={MQTT_CLIENT_ID}") return except Exception as e: last_error = e log_err(f"MQTT connect attempt {attempt + 1} failed: {e}") utime.sleep_ms(1000) _mqtt_connected = False raise last_error _mqtt_check_interval_ms = 30000 _last_mqtt_check = 0 _discovery_queue = [] _discovery_idx = 0 _last_discovery_ms = 0 _DISCOVERY_INTERVAL_MS = 350 def _compact_discovery_payload(payload): """Trim optional HA discovery fields when RAM is tight.""" compact = dict(payload) # Light entities are the largest payloads because they repeat effect metadata. # Keep core functionality, but omit optional effect declarations to reduce heap use. if compact.get("schema") == "json": compact.pop("effect", None) compact.pop("effect_list", None) return compact def check_mqtt(): global client_ref, _mqtt_connected, _last_mqtt_check now = utime.ticks_ms() if utime.ticks_diff(now, _last_mqtt_check) < _mqtt_check_interval_ms: return _mqtt_connected _last_mqtt_check = now if client_ref is None: return False try: client_ref.ping() _mqtt_connected = True return True except Exception as e: log_err(f"MQTT connection lost: {e}") _mqtt_connected = False log_err("Attempting MQTT reconnection...") for attempt in range(3): try: client_ref = MQTTClient( client_id=MQTT_CLIENT_ID, server=MQTT_BROKER, port=MQTT_PORT, user=MQTT_USER, password=MQTT_PASSWORD, keepalive=30, ) client_ref.set_callback(on_message) _mqtt_connect_with_timeout(client_ref) _mqtt_connected = True info("MQTT reconnected!") schedule_discovery() _subscribe_all(client_ref) publish_online(client_ref) publish_state(client_ref) publish_backlight_states(client_ref) return True except Exception as e2: log_err(f"MQTT reconnect attempt {attempt + 1} failed: {e2}") utime.sleep_ms(2000) log_err("MQTT reconnection failed after 3 attempts") return False def _publish_discovery_entity(client, topic, payload, log_msg): gc.collect() client.publish(topic, ujson.dumps(_compact_discovery_payload(payload)), retain=True) info(log_msg) def _append_gauge_discovery(entries, dev_ref): for i, g in enumerate(gauges): gt = gauge_topics[i] entries.append( ( gt["disc"], { "name": g["entity_name"], "unique_id": f"{MQTT_CLIENT_ID}_g{i}", "cmd_t": gt["set"], "stat_t": gt["state"], "avty_t": gt["status"], "min": g["min"], "max": g["max"], "step": 1, "unit_of_meas": g["unit"], "icon": "mdi:gauge", "dev": dev_ref, }, f"Discovery: gauge {i} ({g['name']})", ) ) def _append_speed_discovery(entries, dev_ref): for i, g in enumerate(gauges): gt = gauge_topics[i] entries.append( ( gt["speed_disc"], { "name": f"{g['name']} Speed", "unique_id": f"{MQTT_CLIENT_ID}_g{i}_speed", "cmd_t": gt["speed"], "stat_t": gt["speed_state"], "avty_t": gt["status"], "min": 1, "max": 50000, "step": 1, "mode": "box", "unit_of_meas": "steps/s", "icon": "mdi:speedometer", "entity_category": "config", "dev": dev_ref, }, f"Discovery: gauge {i} speed", ) ) def _append_acceleration_discovery(entries, dev_ref): for i, g in enumerate(gauges): gt = gauge_topics[i] entries.append( ( gt["acceleration_disc"], { "name": f"{g['name']} Acceleration", "unique_id": f"{MQTT_CLIENT_ID}_g{i}_acceleration", "cmd_t": gt["acceleration"], "stat_t": gt["acceleration_state"], "avty_t": gt["status"], "min": 1, "max": 100000, "step": 1, "mode": "box", "unit_of_meas": "steps/s2", "icon": "mdi:chart-bell-curve-cumulative", "entity_category": "config", "dev": dev_ref, }, f"Discovery: gauge {i} acceleration", ) ) def _append_indicator_led_discovery(entries, dev_ref): for i, g in enumerate(gauges): gt = gauge_topics[i] entries.append( ( gt["led_red_disc"], { "name": f"{g['name']} Dial Red LED", "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_red", "cmd_t": gt["led_red"], "stat_t": gt["led_red_state"], "schema": "json", "supported_color_modes": ["onoff"], "effect": True, "effect_list": _EFFECT_LIST, "icon": "mdi:led-on", "dev": dev_ref, "ret": True, }, f"Discovery: gauge {i} red LED", ) ) entries.append( ( gt["led_green_disc"], { "name": f"{g['name']} Dial Green LED", "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_green", "cmd_t": gt["led_green"], "stat_t": gt["led_green_state"], "schema": "json", "supported_color_modes": ["onoff"], "effect": True, "effect_list": _EFFECT_LIST, "icon": "mdi:led-on", "dev": dev_ref, "ret": True, }, f"Discovery: gauge {i} green LED", ) ) def _append_backlight_status_discovery(entries, dev_ref): for i, g in enumerate(gauges): gt = gauge_topics[i] entries.append( ( gt["led_bl_disc"], { "name": f"{g['name']} Backlight", "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_bl", "cmd_t": gt["led_bl"], "stat_t": gt["led_bl_state"], "schema": "json", "supported_color_modes": ["rgb"], "effect": True, "effect_list": _EFFECT_LIST, "icon": "mdi:led-strip", "dev": dev_ref, "ret": True, }, f"Discovery: gauge {i} backlight", ) ) entries.append( ( gt["status_red_disc"], { "name": f"{g['name']} Channel Status Red", "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_status_red", "cmd_t": gt["status_red"], "stat_t": gt["status_red_state"], "schema": "json", "supported_color_modes": ["onoff"], "effect": True, "effect_list": _EFFECT_LIST, "icon": "mdi:led-on", "dev": dev_ref, "ret": True, }, f"Discovery: gauge {i} status red", ) ) entries.append( ( gt["status_green_disc"], { "name": f"{g['name']} Channel Status Green", "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_status_green", "cmd_t": gt["status_green"], "stat_t": gt["status_green_state"], "schema": "json", "supported_color_modes": ["onoff"], "effect": True, "effect_list": _EFFECT_LIST, "icon": "mdi:led-on", "dev": dev_ref, "ret": True, }, f"Discovery: gauge {i} status green", ) ) def _append_vfd_discovery(entries, dev_ref): entries.append( ( vfd_topics["disc"], { "name": "VFD Display", "unique_id": f"{MQTT_CLIENT_ID}_vfd", "cmd_t": vfd_topics["set"], "stat_t": vfd_topics["state"], "avty_t": gauge_topics[0]["status"], "icon": "mdi:alpha-box", "dev": dev_ref, }, "Discovery: VFD text", ) ) entries.append( ( vfd_topics["decimal_point_disc"], { "name": "VFD Decimal Point", "unique_id": f"{MQTT_CLIENT_ID}_vfd_decimal_point", "cmd_t": vfd_topics["decimal_point"], "stat_t": vfd_topics["decimal_point_state"], "avty_t": gauge_topics[0]["status"], "pl_on": "ON", "pl_off": "OFF", "icon": "mdi:circle-small", "dev": dev_ref, }, "Discovery: VFD decimal point", ) ) entries.append( ( vfd_topics["alarm_disc"], { "name": "VFD Alarm", "unique_id": f"{MQTT_CLIENT_ID}_vfd_alarm", "cmd_t": vfd_topics["alarm"], "stat_t": vfd_topics["alarm_state"], "avty_t": gauge_topics[0]["status"], "pl_on": "ON", "pl_off": "OFF", "icon": "mdi:alarm-bell", "dev": dev_ref, }, "Discovery: VFD alarm", ) ) def schedule_discovery(): global _discovery_queue, _discovery_idx, _last_discovery_ms _dev_ref = _DEVICE entries = [] _append_legacy_discovery(entries) _append_gauge_discovery(entries, _dev_ref) _append_speed_discovery(entries, _dev_ref) _append_acceleration_discovery(entries, _dev_ref) _append_indicator_led_discovery(entries, _dev_ref) _append_backlight_status_discovery(entries, _dev_ref) _append_vfd_discovery(entries, _dev_ref) _discovery_queue = entries _discovery_idx = 0 _last_discovery_ms = 0 def _append_legacy_discovery(entries): for i in range(num_gauges): for old_t in [ f"homeassistant/switch/{MQTT_CLIENT_ID}_g{i}_red/config", f"homeassistant/switch/{MQTT_CLIENT_ID}_g{i}_green/config", f"homeassistant/switch/{MQTT_CLIENT_ID}_g{i}_status_red/config", f"homeassistant/switch/{MQTT_CLIENT_ID}_g{i}_status_green/config", ]: entries.append((old_t, b"", None)) def service_discovery(): global _discovery_idx, _last_discovery_ms if client_ref is None or _discovery_idx >= len(_discovery_queue): return now = utime.ticks_ms() if _last_discovery_ms and utime.ticks_diff(now, _last_discovery_ms) < _DISCOVERY_INTERVAL_MS: return gc.collect() topic, payload, log_msg = _discovery_queue[_discovery_idx] try: if isinstance(payload, bytes): client_ref.publish(topic, payload, retain=True) else: _publish_discovery_entity(client_ref, topic, payload, log_msg) except Exception as e: log_err(f"Discovery publish failed for {topic}: {e}") _discovery_idx += 1 _last_discovery_ms = utime.ticks_ms() gc.collect() def publish_online(client): for i in range(num_gauges): client.publish(gauge_topics[i]["status"], b"online", retain=True) def publish_state(client): for i in range(num_gauges): gt = gauge_topics[i] client.publish(gt["state"], str(gauge_targets[i])) client.publish(gt["speed_state"], str(gauge_speeds[i]), retain=True) client.publish(gt["acceleration_state"], str(gauge_accelerations[i]), retain=True) publish_vfd_state(client) def apply_motion_defaults(): for i in range(num_gauges): gauge_set_speed(i, gauge_speeds[i]) gauge_set_acceleration(i, gauge_accelerations[i]) send_vfd_state() # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): info("=" * 48) info("Gauge MQTT controller starting") info("=" * 48) connect_wifi(WIFI_SSID, WIFI_PASSWORD, force_reconnect=False) mqtt_attempts = 0 while True: try: connect_mqtt() break except Exception as e: mqtt_attempts += 1 log_err(f"MQTT connect failed: {e} (attempt {mqtt_attempts})") if mqtt_attempts % 3 == 0: log_err("WiFi may be stale — forcing reconnect...") try: connect_wifi(WIFI_SSID, WIFI_PASSWORD, force_reconnect=True) except Exception as we: log_err(f"WiFi reconnect failed: {we}") utime.sleep_ms(5000) _subscribe_all(client_ref) schedule_discovery() apply_motion_defaults() info("Draining initial retained messages...") for _ in range(50): client_ref.check_msg() utime.sleep_ms(20) info("Homing all gauges on startup ...") arduino_send("HOMEALL") for i in range(num_gauges): gauge_last_rezero[i] = utime.ticks_ms() info("Home command sent") info("Publishing state...") publish_online(client_ref) publish_state(client_ref) utime.sleep_ms(50) for _ in range(5): client_ref.check_msg() utime.sleep_ms(20) info("Entering main loop") info("-" * 48) try: import ota ota.mark_ok() except: pass gc.collect() last_heartbeat = utime.ticks_ms() while True: try: now = utime.ticks_ms() check_wifi() gc.collect() if not check_mqtt(): utime.sleep_ms(1000) continue client_ref.check_msg() service_discovery() arduino_recv() _flush_backlight_state() gc.collect() # Periodic re-home for i in range(num_gauges): if utime.ticks_diff(now, gauge_last_rezero[i]) >= REZERO_INTERVAL_MS: info(f"Periodic re-home: gauge {i}") gauge_home(i) gauge_last_rezero[i] = now if utime.ticks_diff(now, last_heartbeat) >= HEARTBEAT_MS: info(f"Heartbeat: {gauge_targets}") publish_state(client_ref) last_heartbeat = now gc.collect() info(f"Heap free: {gc.mem_free()} bytes") utime.sleep_ms(10) except Exception as e: import sys sys.print_exception(e) log_err(f"Main loop error: {e} — continuing") gc.collect() utime.sleep_ms(100) main()