From cf2c55f5cf34abcd76c86e440cfc11e38ce8091d Mon Sep 17 00:00:00 2001 From: "Adrian A. Baumann" Date: Tue, 14 Apr 2026 21:17:28 +0200 Subject: [PATCH] Code for attached ESP32-MQTT-receiver added --- config.example.json | 55 +++ gauge.py | 871 ++++++++++++++++++++++++++++++++++++++++ ota.py | 474 ++++++++++++++++++++++ ota_config.example.json | 9 + ota_manifest.txt | 1 + 5 files changed, 1410 insertions(+) create mode 100644 config.example.json create mode 100644 gauge.py create mode 100644 ota.py create mode 100644 ota_config.example.json create mode 100644 ota_manifest.txt diff --git a/config.example.json b/config.example.json new file mode 100644 index 0000000..a691d98 --- /dev/null +++ b/config.example.json @@ -0,0 +1,55 @@ +{ + "debug": false, + + "wifi_ssid": "MyNetwork", + "wifi_password": "MyPassword", + + "mqtt_broker": "192.168.1.10", + "mqtt_port": 1883, + "mqtt_user": "mqtt_user", + "mqtt_password": "mqtt_password", + "mqtt_client_id": "gauge_controller", + "mqtt_prefix": "gauges", + + "heartbeat_ms": 10000, + "rezero_interval_ms": 3600000, + + "device": { + "name": "Selsyn Multi", + "model": "Chernobyl Selsyn-inspired gauge", + "manufacturer": "AdeBaumann", + "area": "Control Panels" + }, + + "arduino_uart": 1, + "arduino_tx_pin": 17, + "arduino_rx_pin": 16, + "arduino_baud": 115200, + + "gauges": [ + { + "name": "Gauge 1", + "entity_name": "Selsyn 1 Power", + "min": 0, + "max": 7300, + "max_steps": 4000, + "unit": "W", + "leds": { + "ws2812_red": [255, 0, 0], + "ws2812_green": [0, 255, 0] + } + }, + { + "name": "Gauge 2", + "entity_name": "Selsyn 2 Power", + "min": 0, + "max": 7300, + "max_steps": 4000, + "unit": "W", + "leds": { + "ws2812_red": [255, 0, 0], + "ws2812_green": [0, 255, 0] + } + } + ] +} diff --git a/gauge.py b/gauge.py new file mode 100644 index 0000000..b0cdd47 --- /dev/null +++ b/gauge.py @@ -0,0 +1,871 @@ +""" +gauge.py — MQTT-based gauge controller for ESP32 / MicroPython + +Bridges MQTT commands to an Arduino running the gauge controller firmware +over UART. The Arduino handles all stepper motor motion and LED output. + +LED layout per gauge (0-based indices within the Arduino gauge LED segment): + 0-2 backlight (3 LEDs) + 3 red indicator + 4 green indicator + 5 status red + 6 status green + +Serial commands follow the Arduino gauge controller protocol (see CLAUDE.md). + +Additional config.json fields: + arduino_uart — UART peripheral number (default 1) + arduino_tx_pin — ESP32 GPIO for TX (default 17) + arduino_rx_pin — ESP32 GPIO for RX (default 16) + arduino_baud — baud rate (default 115200) + gauge.max_steps — full-scale step count on the Arduino (default 4000) +""" + +import network +import utime +import ujson +from umqtt.robust import MQTTClient +from machine import UART + +# --------------------------------------------------------------------------- +# Logging +# --------------------------------------------------------------------------- + + +def _ts(): + ms = utime.ticks_ms() + return f"{(ms // 3600000) % 24:02d}:{(ms // 60000) % 60:02d}:{(ms // 1000) % 60:02d}.{ms % 1000:03d}" + + +def log(level, msg): + print(f"[{_ts()}] {level:5s} {msg}") + + +_DEBUG = False + + +def info(msg): + if _DEBUG: + log("INFO", msg) + + +def warn(msg): + log("WARN", msg) + + +def log_err(msg): + log("ERROR", msg) + + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + + +def _load_config(): + try: + with open("/config.json") as f: + cfg = ujson.load(f) + info("Config loaded from /config.json") + return cfg + except OSError: + log_err("config.json not found — cannot continue") + raise + except Exception as e: + log_err(f"config.json parse error: {e} — cannot continue") + raise + + +_cfg = _load_config() + +DEBUG = _cfg.get("debug", False) +_DEBUG = DEBUG + +WIFI_SSID = _cfg["wifi_ssid"] +WIFI_PASSWORD = _cfg["wifi_password"] +MQTT_BROKER = _cfg["mqtt_broker"] +MQTT_PORT = int(_cfg.get("mqtt_port", 1883)) +MQTT_USER = _cfg["mqtt_user"] +MQTT_PASSWORD = _cfg["mqtt_password"] +MQTT_CLIENT_ID = _cfg["mqtt_client_id"] +MQTT_PREFIX = _cfg["mqtt_prefix"] + +HEARTBEAT_MS = int(_cfg.get("heartbeat_ms", 10000)) +REZERO_INTERVAL_MS = int(_cfg.get("rezero_interval_ms", 3600000)) + +device_cfg = _cfg.get("device", {}) +DEVICE_NAME = device_cfg.get("name", _cfg.get("device_name", "Selsyn Multi")) +DEVICE_MODEL = device_cfg.get( + "model", _cfg.get("device_model", "Chernobyl Selsyn-inspired gauge") +) +DEVICE_MFR = device_cfg.get( + "manufacturer", _cfg.get("device_manufacturer", "AdeBaumann") +) +DEVICE_AREA = device_cfg.get("area", _cfg.get("device_area", "Control Panels")) + +gauges = [] +if "gauges" in _cfg: + for i, g in enumerate(_cfg["gauges"]): + led_cfg = g.get("leds", {}) + gauges.append( + { + "id": i, + "name": g.get("name", f"Gauge {i + 1}"), + "min": float(g.get("min", 0)), + "max": float(g.get("max", 100)), + "max_steps": int(g.get("max_steps", 4000)), + "entity_name": g.get("entity_name", f"Gauge {i + 1}"), + "unit": g.get("unit", ""), + "ws2812_red": tuple(led_cfg.get("ws2812_red", [255, 0, 0])), + "ws2812_green": tuple(led_cfg.get("ws2812_green", [0, 255, 0])), + } + ) +else: + gauges.append( + { + "id": 0, + "name": "Gauge 1", + "min": float(_cfg.get("gauge_min", 0)), + "max": float(_cfg.get("gauge_max", 7300)), + "max_steps": int(_cfg.get("gauge_max_steps", 4000)), + "entity_name": _cfg.get("gauge_entity_name", "Selsyn 1 Power"), + "unit": _cfg.get("gauge_unit", "W"), + "ws2812_red": tuple(_cfg.get("ws2812_red", [255, 0, 0])), + "ws2812_green": tuple(_cfg.get("ws2812_green", [0, 255, 0])), + } + ) + +num_gauges = len(gauges) + +# --------------------------------------------------------------------------- +# Arduino UART +# --------------------------------------------------------------------------- + +ARDUINO_UART_ID = int(_cfg.get("arduino_uart", 1)) +ARDUINO_TX_PIN = int(_cfg.get("arduino_tx_pin", 17)) +ARDUINO_RX_PIN = int(_cfg.get("arduino_rx_pin", 16)) +ARDUINO_BAUD = int(_cfg.get("arduino_baud", 115200)) + +_arduino = UART(ARDUINO_UART_ID, baudrate=ARDUINO_BAUD, tx=ARDUINO_TX_PIN, rx=ARDUINO_RX_PIN) + + +def arduino_send(cmd): + """Send a newline-terminated command to the Arduino.""" + _arduino.write((cmd + "\n").encode()) + info(f"Arduino → {cmd}") + + +# --------------------------------------------------------------------------- +# Arduino command helpers +# --------------------------------------------------------------------------- + +# LED indices within each gauge's segment (0-based, matching CLAUDE.md layout) +_LED_BACKLIGHT_RANGE = "0-2" # LEDs 1-3: backlight +_LED_RED = 3 # LED 4: red indicator +_LED_GREEN = 4 # LED 5: green indicator +_LED_STATUS_RED = 5 # LED 6: status red +_LED_STATUS_GREEN = 6 # LED 7: status green + + +def _val_to_steps(gauge_idx, value): + """Map a physical value to an Arduino absolute step position.""" + g = gauges[gauge_idx] + frac = (value - g["min"]) / (g["max"] - g["min"]) + return int(max(0, min(g["max_steps"], frac * g["max_steps"]))) + + +def gauge_set(gauge_idx, value): + arduino_send(f"SET {gauge_idx} {_val_to_steps(gauge_idx, value)}") + + +def gauge_home(gauge_idx): + arduino_send(f"HOME {gauge_idx}") + + +def gauge_zero(gauge_idx): + arduino_send(f"ZERO {gauge_idx}") + + +def _set_led(gauge_idx, idx, r, g, b): + arduino_send(f"LED {gauge_idx} {idx} {r} {g} {b}") + + +def set_backlight(gauge_idx, r, g, b, brightness): + """Send backlight colour+brightness to the 3 backlight LEDs (0-2).""" + scale = brightness / 100 + _set_led(gauge_idx, _LED_BACKLIGHT_RANGE, int(r * scale), int(g * scale), int(b * scale)) + + +def set_red_led(gauge_idx, on): + r, g, b = gauges[gauge_idx]["ws2812_red"] if on else (0, 0, 0) + _set_led(gauge_idx, _LED_RED, r, g, b) + + +def set_green_led(gauge_idx, on): + r, g, b = gauges[gauge_idx]["ws2812_green"] if on else (0, 0, 0) + _set_led(gauge_idx, _LED_GREEN, r, g, b) + + +def set_status_led(gauge_idx, led_type, on): + if led_type == "red": + r, g, b = gauges[gauge_idx]["ws2812_red"] if on else (0, 0, 0) + _set_led(gauge_idx, _LED_STATUS_RED, r, g, b) + elif led_type == "green": + r, g, b = gauges[gauge_idx]["ws2812_green"] if on else (0, 0, 0) + _set_led(gauge_idx, _LED_STATUS_GREEN, r, g, b) + + +# --------------------------------------------------------------------------- +# State tracking (for MQTT state publishing) +# --------------------------------------------------------------------------- + +gauge_targets = [g["min"] for g in gauges] +gauge_last_rezero = [utime.ticks_ms() for _ in gauges] + +backlight_color = [(0, 0, 0) for _ in range(num_gauges)] +backlight_brightness = [100 for _ in range(num_gauges)] +backlight_on = [False for _ in range(num_gauges)] + +_bl_dirty_since = None +_BL_SAVE_DELAY_MS = 5000 + +client_ref = None +_mqtt_connected = False + + +def _backlight_changed(gauge_idx, new_color, new_on, new_brightness): + return ( + new_color != backlight_color[gauge_idx] + or new_on != backlight_on[gauge_idx] + or (new_on and new_brightness != backlight_brightness[gauge_idx]) + ) + + +def _mark_bl_dirty(): + global _bl_dirty_since + _bl_dirty_since = utime.ticks_ms() + + +def set_backlight_color(gauge_idx, r, g, b, brightness=None): + global backlight_color, backlight_brightness, backlight_on + if brightness is None: + brightness = backlight_brightness[gauge_idx] + new_on = brightness > 0 + if not _backlight_changed(gauge_idx, (r, g, b), new_on, brightness): + return + backlight_color[gauge_idx] = (r, g, b) + if brightness > 0: + backlight_brightness[gauge_idx] = brightness + backlight_on[gauge_idx] = new_on + set_backlight(gauge_idx, r, g, b, brightness) + _mark_bl_dirty() + + +def set_backlight_brightness(gauge_idx, brightness): + global backlight_brightness, backlight_on + clamped = max(0, min(100, brightness)) + new_on = clamped > 0 + if not _backlight_changed(gauge_idx, backlight_color[gauge_idx], new_on, clamped): + return + if clamped > 0: + backlight_brightness[gauge_idx] = clamped + backlight_on[gauge_idx] = new_on + r, g, b = backlight_color[gauge_idx] + set_backlight(gauge_idx, r, g, b, clamped) + _mark_bl_dirty() + + +def publish_backlight_states(client): + """Publish current backlight state for all gauges as retained MQTT messages.""" + for i in range(num_gauges): + gt = gauge_topics[i] + r, g, b = backlight_color[i] + brightness = backlight_brightness[i] + state = { + "state": "ON" if backlight_on[i] else "OFF", + "color_mode": "rgb", + "brightness": int(brightness * 2.55), + "color": {"r": r, "g": g, "b": b}, + } + try: + client.publish(gt["led_bl_state"], ujson.dumps(state), retain=True) + except Exception as e: + log_err(f"Backlight state publish failed for gauge {i}: {e}") + + +def _flush_backlight_state(): + global _bl_dirty_since + if _bl_dirty_since is None: + return + if utime.ticks_diff(utime.ticks_ms(), _bl_dirty_since) < _BL_SAVE_DELAY_MS: + return + if client_ref is None: + return + publish_backlight_states(client_ref) + _bl_dirty_since = None + info("Backlight state flushed to MQTT") + + +def _publish(topic, payload, retain=False): + """Safely publish MQTT message, returning True on success.""" + if client_ref is None: + return False + try: + client_ref.publish(topic, payload, retain=retain) + return True + except Exception as e: + log_err(f"MQTT publish failed: {e}") + return False + + +# --------------------------------------------------------------------------- +# Topics (per-gauge) +# --------------------------------------------------------------------------- + + +def make_gauge_topics(prefix, gauge_id): + return { + "set": f"{prefix}/gauge{gauge_id}/set", + "state": f"{prefix}/gauge{gauge_id}/state", + "status": f"{prefix}/gauge{gauge_id}/status", + "zero": f"{prefix}/gauge{gauge_id}/zero", + "disc": f"homeassistant/number/{MQTT_CLIENT_ID}_g{gauge_id}/config", + "led_red": f"{prefix}/gauge{gauge_id}/led/red/set", + "led_green": f"{prefix}/gauge{gauge_id}/led/green/set", + "led_bl": f"{prefix}/gauge{gauge_id}/led/backlight/set", + "led_red_state": f"{prefix}/gauge{gauge_id}/led/red/state", + "led_green_state": f"{prefix}/gauge{gauge_id}/led/green/state", + "led_bl_state": f"{prefix}/gauge{gauge_id}/led/backlight/state", + "led_red_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_red/config", + "led_green_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_green/config", + "led_bl_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_bl/config", + "status_red": f"{prefix}/gauge{gauge_id}/status_led/red/set", + "status_green": f"{prefix}/gauge{gauge_id}/status_led/green/set", + "status_red_state": f"{prefix}/gauge{gauge_id}/status_led/red/state", + "status_green_state": f"{prefix}/gauge{gauge_id}/status_led/green/state", + "status_red_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_status_red/config", + "status_green_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_status_green/config", + } + + +gauge_topics = [make_gauge_topics(MQTT_PREFIX, g["id"]) for g in gauges] + +T_SET = f"{MQTT_PREFIX}/set" +T_ZERO = f"{MQTT_PREFIX}/zero" + +_DEVICE = { + "identifiers": [MQTT_CLIENT_ID], + "name": DEVICE_NAME, + "model": DEVICE_MODEL, + "manufacturer": DEVICE_MFR, + "suggested_area": DEVICE_AREA, +} + +# --------------------------------------------------------------------------- +# WiFi +# --------------------------------------------------------------------------- + +_wifi_check_interval_ms = 30000 +_last_wifi_check = 0 +_wifi_sta = None + + +def connect_wifi(ssid, password, timeout_s=15): + global _wifi_sta + _wifi_sta = network.WLAN(network.STA_IF) + _wifi_sta.active(True) + if _wifi_sta.isconnected(): + ip, mask, gw, dns = _wifi_sta.ifconfig() + info("WiFi already connected") + info(f" IP:{ip} mask:{mask} gw:{gw} dns:{dns}") + return ip + info(f"WiFi connecting to '{ssid}' ...") + _wifi_sta.connect(ssid, password) + deadline = utime.time() + timeout_s + while not _wifi_sta.isconnected(): + if utime.time() > deadline: + log_err(f"WiFi connect timeout after {timeout_s}s") + raise OSError("WiFi connect timeout") + utime.sleep_ms(200) + ip, mask, gw, dns = _wifi_sta.ifconfig() + mac = ":".join(f"{b:02x}" for b in _wifi_sta.config("mac")) + info("WiFi connected!") + info(f" SSID : {ssid}") + info(f" MAC : {mac}") + info(f" IP : {ip} mask:{mask} gw:{gw} dns:{dns}") + return ip + + +def check_wifi(): + global _wifi_sta, _last_wifi_check + now = utime.ticks_ms() + if utime.ticks_diff(now, _last_wifi_check) < _wifi_check_interval_ms: + return + _last_wifi_check = now + + if _wifi_sta is None: + _wifi_sta = network.WLAN(network.STA_IF) + + if _wifi_sta.isconnected(): + return + + log_err("WiFi lost connection — attempting reconnect...") + try: + _wifi_sta.active(True) + _wifi_sta.connect(WIFI_SSID, WIFI_PASSWORD) + deadline = utime.time() + 15 + while not _wifi_sta.isconnected(): + if utime.time() > deadline: + log_err("WiFi reconnect timeout") + return + utime.sleep_ms(200) + ip, mask, gw, dns = _wifi_sta.ifconfig() + info(f"WiFi reconnected! IP:{ip}") + except Exception as e: + log_err(f"WiFi reconnect failed: {e}") + + +# --------------------------------------------------------------------------- +# MQTT callbacks +# --------------------------------------------------------------------------- + + +def on_message(topic, payload): + if client_ref is None: + return + topic = topic.decode() + payload = payload.decode().strip() + info(f"MQTT rx {topic} {payload}") + + for i, gt in enumerate(gauge_topics): + if topic == gt["zero"]: + info(f"Home command received for gauge {i}") + gauge_home(i) + gauge_last_rezero[i] = utime.ticks_ms() + return + + if topic == gt["set"]: + g = gauges[i] + try: + val = max(g["min"], min(g["max"], float(payload))) + gauge_targets[i] = val + gauge_set(i, val) + info(f"Gauge {i} target → {val:.1f}") + except ValueError: + warn(f"Invalid set value for gauge {i}: '{payload}'") + return + + if topic == gt["led_red"]: + state = payload.upper() == "ON" + set_red_led(i, state) + _publish(gt["led_red_state"], "ON" if state else "OFF", retain=True) + info(f"Gauge {i} red LED → {'ON' if state else 'OFF'}") + return + + if topic == gt["led_green"]: + state = payload.upper() == "ON" + set_green_led(i, state) + _publish(gt["led_green_state"], "ON" if state else "OFF", retain=True) + info(f"Gauge {i} green LED → {'ON' if state else 'OFF'}") + return + + if topic == gt["led_bl"]: + try: + data = ujson.loads(payload) + if data.get("state", "ON").upper() == "OFF": + set_backlight_brightness(i, 0) + _publish( + gt["led_bl_state"], ujson.dumps({"state": "OFF"}), retain=True + ) + info(f"Gauge {i} backlight → OFF") + return + color = data.get("color", {}) + r = max(0, min(255, int(color.get("r", backlight_color[i][0])))) + g = max(0, min(255, int(color.get("g", backlight_color[i][1])))) + b = max(0, min(255, int(color.get("b", backlight_color[i][2])))) + raw_br = data.get("brightness", None) + if raw_br is not None: + brightness = max(0, min(100, round(int(raw_br) / 2.55))) + elif backlight_brightness[i] > 0: + brightness = backlight_brightness[i] + else: + brightness = 100 + except Exception as e: + warn(f"Invalid backlight payload for gauge {i}: '{payload}' ({e})") + return + set_backlight_color(i, r, g, b, brightness) + state = { + "state": "ON", + "color_mode": "rgb", + "brightness": int(brightness * 2.55), + "color": {"r": r, "g": g, "b": b}, + } + _publish(gt["led_bl_state"], ujson.dumps(state), retain=True) + info(f"Gauge {i} backlight → #{r:02x}{g:02x}{b:02x} @ {brightness}%") + return + + if topic == gt["status_red"]: + state = payload.upper() == "ON" + set_status_led(i, "red", state) + _publish(gt["status_red_state"], "ON" if state else "OFF", retain=True) + info(f"Gauge {i} status red → {'ON' if state else 'OFF'}") + return + + if topic == gt["status_green"]: + state = payload.upper() == "ON" + set_status_led(i, "green", state) + _publish(gt["status_green_state"], "ON" if state else "OFF", retain=True) + info(f"Gauge {i} status green → {'ON' if state else 'OFF'}") + return + + if topic == T_ZERO: + for i in range(num_gauges): + gauge_home(i) + gauge_last_rezero[i] = utime.ticks_ms() + info("All gauges homed") + return + + if topic == T_SET: + try: + data = ujson.loads(payload) + if isinstance(data, dict): + for i, val in enumerate(data.values()): + if i < num_gauges: + g = gauges[i] + cval = max(g["min"], min(g["max"], float(val))) + gauge_targets[i] = cval + gauge_set(i, cval) + info(f"Gauge {i} target → {cval:.1f}") + else: + val = float(payload) + for i in range(num_gauges): + g = gauges[i] + cval = max(g["min"], min(g["max"], val)) + gauge_targets[i] = cval + gauge_set(i, cval) + info(f"All gauges target → {val:.1f}") + except Exception: + try: + val = float(payload) + for i in range(num_gauges): + g = gauges[i] + cval = max(g["min"], min(g["max"], val)) + gauge_targets[i] = cval + gauge_set(i, cval) + info(f"All gauges target → {val:.1f}") + except: + warn(f"Invalid set value: '{payload}'") + return + + +# --------------------------------------------------------------------------- +# MQTT connect + discovery +# --------------------------------------------------------------------------- + + +def _subscribe_all(c): + c.subscribe(f"{MQTT_PREFIX}/set") + c.subscribe(f"{MQTT_PREFIX}/zero") + for i in range(num_gauges): + prefix = f"{MQTT_PREFIX}/gauge{i}" + c.subscribe(f"{prefix}/set") + c.subscribe(f"{prefix}/zero") + c.subscribe(f"{prefix}/led/red/set") + c.subscribe(f"{prefix}/led/green/set") + c.subscribe(f"{prefix}/led/backlight/set") + c.subscribe(f"{prefix}/status_led/red/set") + c.subscribe(f"{prefix}/status_led/green/set") + + +def connect_mqtt(): + global client_ref, _mqtt_connected + info(f"Connecting to MQTT broker {MQTT_BROKER}:{MQTT_PORT} ...") + client = MQTTClient( + client_id=MQTT_CLIENT_ID, + server=MQTT_BROKER, + port=MQTT_PORT, + user=MQTT_USER, + password=MQTT_PASSWORD, + keepalive=30, + ) + client.set_callback(on_message) + client.connect() + client_ref = client + _mqtt_connected = True + info(f"MQTT connected client_id={MQTT_CLIENT_ID}") + + +_mqtt_check_interval_ms = 30000 +_last_mqtt_check = 0 + + +def check_mqtt(): + global client_ref, _mqtt_connected, _last_mqtt_check + now = utime.ticks_ms() + if utime.ticks_diff(now, _last_mqtt_check) < _mqtt_check_interval_ms: + return _mqtt_connected + _last_mqtt_check = now + + if client_ref is None: + return False + + try: + client_ref.ping() + _mqtt_connected = True + return True + except Exception as e: + log_err(f"MQTT connection lost: {e}") + _mqtt_connected = False + + log_err("Attempting MQTT reconnection...") + for attempt in range(3): + try: + client_ref = MQTTClient( + client_id=MQTT_CLIENT_ID, + server=MQTT_BROKER, + port=MQTT_PORT, + user=MQTT_USER, + password=MQTT_PASSWORD, + keepalive=30, + ) + client_ref.set_callback(on_message) + client_ref.connect() + _mqtt_connected = True + info("MQTT reconnected!") + publish_discovery(client_ref) + _subscribe_all(client_ref) + publish_state(client_ref) + publish_backlight_states(client_ref) + return True + except Exception as e2: + log_err(f"MQTT reconnect attempt {attempt + 1} failed: {e2}") + utime.sleep_ms(2000) + + log_err("MQTT reconnection failed after 3 attempts") + return False + + +def publish_discovery(client): + """Publish all HA MQTT discovery payloads for gauges and LEDs.""" + _dev_ref = _DEVICE + + for i, g in enumerate(gauges): + gt = gauge_topics[i] + + client.publish( + gt["disc"], + ujson.dumps( + { + "name": g["entity_name"], + "unique_id": f"{MQTT_CLIENT_ID}_g{i}", + "cmd_t": gt["set"], + "stat_t": gt["state"], + "avty_t": gt["status"], + "min": g["min"], + "max": g["max"], + "step": 1, + "unit_of_meas": g["unit"], + "icon": "mdi:gauge", + "dev": _dev_ref, + } + ), + retain=True, + ) + info(f"Discovery: gauge {i} ({g['name']})") + + for _ in range(5): + client.check_msg() + utime.sleep_ms(10) + + client.publish( + gt["led_red_disc"], + ujson.dumps( + { + "name": f"{g['name']} Red LED", + "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_red", + "cmd_t": gt["led_red"], + "stat_t": gt["led_red_state"], + "pl_on": "ON", + "pl_off": "OFF", + "icon": "mdi:led-on", + "dev": _dev_ref, + "ret": True, + } + ), + retain=True, + ) + info(f"Discovery: gauge {i} red LED") + + client.publish( + gt["led_green_disc"], + ujson.dumps( + { + "name": f"{g['name']} Green LED", + "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_green", + "cmd_t": gt["led_green"], + "stat_t": gt["led_green_state"], + "pl_on": "ON", + "pl_off": "OFF", + "icon": "mdi:led-on", + "dev": _dev_ref, + "ret": True, + } + ), + retain=True, + ) + info(f"Discovery: gauge {i} green LED") + + for _ in range(5): + client.check_msg() + utime.sleep_ms(10) + + client.publish( + gt["led_bl_disc"], + ujson.dumps( + { + "name": f"{g['name']} Backlight", + "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_bl", + "cmd_t": gt["led_bl"], + "stat_t": gt["led_bl_state"], + "schema": "json", + "supported_color_modes": ["rgb"], + "icon": "mdi:led-strip", + "dev": _dev_ref, + "ret": True, + } + ), + retain=True, + ) + info(f"Discovery: gauge {i} backlight") + + client.publish( + gt["status_red_disc"], + ujson.dumps( + { + "name": f"{g['name']} Status Red", + "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_status_red", + "cmd_t": gt["status_red"], + "stat_t": gt["status_red_state"], + "pl_on": "ON", + "pl_off": "OFF", + "icon": "mdi:led-on", + "dev": _dev_ref, + "ret": True, + } + ), + retain=True, + ) + info(f"Discovery: gauge {i} status red") + + client.publish( + gt["status_green_disc"], + ujson.dumps( + { + "name": f"{g['name']} Status Green", + "uniq_id": f"{MQTT_CLIENT_ID}_g{i}_status_green", + "cmd_t": gt["status_green"], + "stat_t": gt["status_green_state"], + "pl_on": "ON", + "pl_off": "OFF", + "icon": "mdi:led-on", + "dev": _dev_ref, + "ret": True, + } + ), + retain=True, + ) + info(f"Discovery: gauge {i} status green") + + for _ in range(5): + client.check_msg() + utime.sleep_ms(10) + + +def publish_state(client): + for i in range(num_gauges): + gt = gauge_topics[i] + client.publish(gt["state"], str(gauge_targets[i])) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +def main(): + info("=" * 48) + info("Gauge MQTT controller starting") + info("=" * 48) + + connect_wifi(WIFI_SSID, WIFI_PASSWORD) + + connect_mqtt() + + info("Publishing discovery...") + publish_discovery(client_ref) + + _subscribe_all(client_ref) + info("Draining initial retained messages...") + for _ in range(50): + client_ref.check_msg() + utime.sleep_ms(20) + + info("Homing all gauges on startup ...") + arduino_send("HOMEALL") + for i in range(num_gauges): + gauge_last_rezero[i] = utime.ticks_ms() + info("Home command sent") + + info("Publishing state...") + publish_state(client_ref) + utime.sleep_ms(50) + for _ in range(5): + client_ref.check_msg() + utime.sleep_ms(20) + + info("Entering main loop") + info("-" * 48) + + try: + import ota + ota.mark_ok() + except: + pass + + last_heartbeat = utime.ticks_ms() + + while True: + try: + now = utime.ticks_ms() + + check_wifi() + + if not check_mqtt(): + utime.sleep_ms(1000) + continue + + client_ref.check_msg() + _flush_backlight_state() + + # Periodic re-home + for i in range(num_gauges): + if utime.ticks_diff(now, gauge_last_rezero[i]) >= REZERO_INTERVAL_MS: + info(f"Periodic re-home: gauge {i}") + gauge_home(i) + gauge_last_rezero[i] = now + + if utime.ticks_diff(now, last_heartbeat) >= HEARTBEAT_MS: + info(f"Heartbeat: {gauge_targets}") + publish_state(client_ref) + last_heartbeat = now + + utime.sleep_ms(10) + + except Exception as e: + import sys + sys.print_exception(e) + log_err(f"Main loop error: {e} — continuing") + utime.sleep_ms(100) + + +main() diff --git a/ota.py b/ota.py new file mode 100644 index 0000000..53c6706 --- /dev/null +++ b/ota.py @@ -0,0 +1,474 @@ +""" +ota.py — Gitea OTA updater for ESP32 / MicroPython + +Call ota.update() from boot.py before importing anything else. +If the update or the subsequent boot fails, the updater retries +on the next boot rather than bricking the device. + +Strategy +-------- +1. Check if last boot was good (OK flag exists). +2. If good, fetch remote commit SHA and compare with local — if unchanged, + skip file check entirely. +3. If new commit or failed boot, fetch ota_manifest.txt from the repo + to determine which files to sync. +4. Compare SHA1 hashes with a local manifest (.ota_manifest.json). +5. Download only changed or missing files, writing to .tmp first. +6. On success, rename .tmp files into place and update the manifest. +7. If anything fails mid-update, the manifest is not updated, so the + next boot will retry. Partially written .tmp files are cleaned up. +8. A "safety" flag file (.ota_ok) is written by main.py on successful + startup. If it is absent on boot, the previous update is suspected + bad — the manifest is wiped so all files are re-fetched cleanly. + +Manifest format (ota_manifest.txt) +--------------------------------- +Each line specifies a file or directory to include: + boot.py # specific file + ota.py # another file + selsyn/ # entire directory (trailing slash) + lib/ # another directory + *.py # wildcard (matches anywhere) + selsyn/*.py # wildcard in subdirectory + +Usage in boot.py +---------------- + import ota + ota.update() + # imports of main etc. go here + +Configuration +------------- +Edit the block below, or override from a local config file +(see SETTINGS_FILE). All settings can be left as module-level +constants or placed in /ota_config.json: + { + "gitea_base": "http://git.baumann.gr", + "repo_owner": "adebaumann", + "repo_name": "HomeControlPanel", + "repo_folder": "firmware", + "repo_branch": "main", + "api_token": "nicetry-nothere" + } +""" + +import os +import gc +import sys +import ujson +import urequests +import utime + +# --------------------------------------------------------------------------- +# Default configuration — override via /ota_config.json +# --------------------------------------------------------------------------- + +GITEA_BASE = "http://git.baumann.gr" # no trailing slash +REPO_OWNER = "adrian" +REPO_NAME = "esp32-gauge" +REPO_FOLDER = "firmware" # folder inside repo to sync +REPO_BRANCH = "main" +API_TOKEN = None # set to string for private repos + +WIFI_SSID = None +WIFI_PASSWORD = None + +SETTINGS_FILE = "/ota_config.json" +MANIFEST_FILE = "/.ota_manifest.json" +OK_FLAG_FILE = "/.ota_ok" +OTA_MANIFEST = "ota_manifest.txt" + +# --------------------------------------------------------------------------- +# Logging +# --------------------------------------------------------------------------- + + +def _ts(): + ms = utime.ticks_ms() + return f"{(ms // 3600000) % 24:02d}:{(ms // 60000) % 60:02d}:{(ms // 1000) % 60:02d}.{ms % 1000:03d}" + + +def _log(level, msg): + print(f"[{_ts()}] {level:5s} [OTA] {msg}") + + +def info(msg): + _log("INFO", msg) + + +def warn(msg): + _log("WARN", msg) + + +def log_err(msg): + _log("ERROR", msg) + + +# --------------------------------------------------------------------------- +# HTTP helpers +# --------------------------------------------------------------------------- + + +def _headers(): + h = {"Accept": "application/json"} + if API_TOKEN: + h["Authorization"] = f"token {API_TOKEN}" + return h + + +# --------------------------------------------------------------------------- +# Config loader +# --------------------------------------------------------------------------- + + +def load_config(): + global \ + GITEA_BASE, \ + REPO_OWNER, \ + REPO_NAME, \ + REPO_FOLDER, \ + REPO_BRANCH, \ + API_TOKEN, \ + WIFI_SSID, \ + WIFI_PASSWORD + try: + with open(SETTINGS_FILE) as f: + cfg = ujson.load(f) + GITEA_BASE = cfg.get("gitea_base", GITEA_BASE) + REPO_OWNER = cfg.get("repo_owner", REPO_OWNER) + REPO_NAME = cfg.get("repo_name", REPO_NAME) + REPO_FOLDER = cfg.get("repo_folder", REPO_FOLDER) + REPO_BRANCH = cfg.get("repo_branch", REPO_BRANCH) + API_TOKEN = cfg.get("api_token", API_TOKEN) + WIFI_SSID = cfg.get("wifi_ssid", WIFI_SSID) + WIFI_PASSWORD = cfg.get("wifi_password", WIFI_PASSWORD) + info(f"Config loaded from {SETTINGS_FILE}") + except OSError: + info(f"No {SETTINGS_FILE} found — using defaults") + except Exception as e: + warn(f"Config parse error: {e} — using defaults") + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _match_pattern(name, pattern): + if "*" not in pattern: + return name == pattern + i, n = 0, len(pattern) + j, m = 0, len(name) + star = -1 + while i < n and j < m: + if pattern[i] == "*": + star = i + i += 1 + elif pattern[i] == name[j]: + i += 1 + j += 1 + elif star >= 0: + i = star + 1 + j += 1 + else: + return False + while i < n and pattern[i] == "*": + i += 1 + return i == n and j == m + + +def _fetch_commit_sha(): + url = f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}/branches/{REPO_BRANCH}" + try: + r = urequests.get(url, headers=_headers()) + if r.status_code == 200: + data = r.json() + r.close() + return data.get("commit", {}).get("id") + r.close() + except Exception as e: + log_err(f"Failed to fetch commit: {e}") + return None + + +def _fetch_manifest(): + url = ( + f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}" + f"/contents/{OTA_MANIFEST}?ref={REPO_BRANCH}" + ) + try: + r = urequests.get(url, headers=_headers()) + try: + if r.status_code == 200: + data = r.json() + if data.get("content"): + import ubinascii + + content = ubinascii.a2b_base64(data["content"]).decode() + patterns = [line.strip() for line in content.splitlines()] + return [p for p in patterns if p and not p.startswith("#")] + else: + warn(f"Manifest not found at {OTA_MANIFEST}") + finally: + r.close() + except Exception as e: + log_err(f"Failed to fetch manifest: {e}") + return None + + +def _fetch_dir(path): + url = ( + f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}" + f"/contents/{path}?ref={REPO_BRANCH}" + ) + return _api_get(url) + + +def _api_get(url): + """GET a URL and return parsed JSON, or None on failure.""" + try: + r = urequests.get(url, headers=_headers()) + if r.status_code == 200: + data = r.json() + r.close() + return data + warn(f"HTTP {r.status_code} for {url}") + r.close() + except Exception as e: + log_err(f"GET {url} failed: {e}") + return None + + +def _download(url, dest_path): + """Download url to dest_path. Returns True on success.""" + tmp = dest_path + ".tmp" + try: + r = urequests.get(url, headers=_headers()) + if r.status_code != 200: + warn(f"Download failed HTTP {r.status_code}: {url}") + r.close() + return False + with open(tmp, "wb") as f: + f.write(r.content) + r.close() + # Rename into place + try: + os.remove(dest_path) + except OSError: + pass + os.rename(tmp, dest_path) + return True + except Exception as e: + log_err(f"Download error {url}: {e}") + try: + os.remove(tmp) + except OSError: + pass + return False + + +def _load_manifest(): + try: + with open(MANIFEST_FILE) as f: + return ujson.load(f) + except Exception: + return {} + + +def _save_manifest(manifest, commit_sha=None): + try: + with open(MANIFEST_FILE, "w") as f: + if commit_sha: + manifest["_commit"] = commit_sha + ujson.dump(manifest, f) + except Exception as e: + warn(f"Could not save manifest: {e}") + + + +def _ok_flag_exists(): + try: + os.stat(OK_FLAG_FILE) + return True + except OSError: + return False + + +def _clear_ok_flag(): + try: + os.remove(OK_FLAG_FILE) + except OSError: + pass + + +def mark_ok(): + """ + Call this from main.py after successful startup. + Signals to the OTA updater that the last update was good. + """ + try: + with open(OK_FLAG_FILE, "w") as f: + f.write("ok") + except Exception as e: + warn(f"Could not write OK flag: {e}") + + +# --------------------------------------------------------------------------- +# Core update logic +# --------------------------------------------------------------------------- + + +def _fetch_file_list(): + """ + Returns list of {name, sha, download_url} dicts based on the + ota_manifest.txt patterns in the repo folder, or None on failure. + """ + manifest_patterns = _fetch_manifest() + if manifest_patterns is None: + log_err("No manifest — cannot determine what to fetch") + return None + + info(f"Manifest patterns: {manifest_patterns}") + files = [] + visited = set() + + def fetch_matching(entries, patterns): + for entry in entries: + if entry.get("type") == "dir": + for p in patterns: + if p.endswith("/") and entry["name"].startswith(p.rstrip("/")): + sub = _fetch_dir(entry["path"]) + if sub: + fetch_matching(sub, patterns) + break + else: + name = entry["name"] + for p in patterns: + p = p.rstrip("/") + if _match_pattern(name, p) or _match_pattern(entry["path"], p): + if entry["path"] not in visited: + visited.add(entry["path"]) + files.append( + { + "name": entry["path"], + "sha": entry["sha"], + "download_url": entry["download_url"], + } + ) + break + + root = _fetch_dir(REPO_FOLDER) + if root is None: + return None + + fetch_matching(root, manifest_patterns) + return files + + +def _do_update(commit_sha=None): + """ + Fetch file list, download changed files, update manifest. + Returns True if all succeeded (or nothing needed updating). + """ + info( + f"Checking {GITEA_BASE}/{REPO_OWNER}/{REPO_NAME}/{REPO_FOLDER} @ {REPO_BRANCH}" + ) + file_list = _fetch_file_list() + if file_list is None: + log_err("Could not fetch file list — skipping update") + return False + + info(f"Found {len(file_list)} file(s) to sync") + manifest = _load_manifest() + updated = [] + failed = [] + + for entry in file_list: + name = entry["name"] + sha = entry["sha"] + + if manifest.get(name) == sha: + info(f" {name} up to date") + continue + + info(f" {name} updating (sha={sha[:8]}...)") + gc.collect() + ok = _download(entry["download_url"], f"/{name}") + if ok: + manifest[name] = sha + updated.append(name) + info(f" {name} OK") + else: + failed.append(name) + log_err(f" {name} FAILED") + + if failed: + log_err(f"Update incomplete — {len(failed)} file(s) failed: {failed}") + _save_manifest(manifest, commit_sha) + return False + + _save_manifest(manifest, commit_sha) + + if updated: + info(f"Update complete — {len(updated)} file(s) updated: {updated}") + else: + info("All files up to date — nothing to do") + + return True + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + + +def update(): + """ + Main entry point. Call from boot.py before importing application code. + + - If the OK flag is missing, the previous boot is assumed to have + failed — wipes the manifest so everything is re-fetched cleanly. + - If the commit hash hasn't changed and last boot was good, skip + file comparison entirely. + - Runs the update. + - Clears the OK flag so main.py must re-assert it on successful start. + """ + info("=" * 40) + info("OTA updater starting") + info("=" * 40) + + load_config() + + ok_flag = _ok_flag_exists() + manifest = _load_manifest() + + if not ok_flag: + warn("OK flag missing — last boot may have failed") + warn("Re-checking all files, will only download changed ones") + else: + info("OK flag present — last boot was good") + + commit_sha = _fetch_commit_sha() + + if ok_flag and commit_sha and manifest.get("_commit") == commit_sha: + info(f"Commit unchanged ({commit_sha[:8]}) — skipping file check") + info("-" * 40) + return + + if commit_sha: + info(f"Remote commit: {commit_sha[:8]}") + else: + warn("Could not fetch remote commit — proceeding with file check") + + # Clear the flag now; main.py must call ota.mark_ok() to re-set it + _clear_ok_flag() + + success = _do_update(commit_sha) + + if success: + info("OTA check complete — booting application") + else: + warn("OTA check had errors — booting with current files") + + info("-" * 40) + gc.collect() diff --git a/ota_config.example.json b/ota_config.example.json new file mode 100644 index 0000000..c98637a --- /dev/null +++ b/ota_config.example.json @@ -0,0 +1,9 @@ +{ + "gitea_base": "http://git.baumann.gr", + "repo_owner": "adebaumann", + "repo_name": "Selsyn_inspired_gauge", + "repo_folder": "", + "repo_branch": "main", + "wifi_ssid": "YourNetwork", + "wifi_password": "YourPassword" +} diff --git a/ota_manifest.txt b/ota_manifest.txt new file mode 100644 index 0000000..f104652 --- /dev/null +++ b/ota_manifest.txt @@ -0,0 +1 @@ +*.py