From 9cc1b695bd5572ea44c49344f2ce91a3d03fbea0 Mon Sep 17 00:00:00 2001 From: "Adrian A. Baumann" Date: Sat, 4 Apr 2026 16:47:39 +0200 Subject: [PATCH] Initial complete commit --- boot.py | 46 ++++ config.example.json | 32 +++ gauge_vid6008.py | 187 +++++++++++++++ gaugemqttcontinuous.py | 494 ++++++++++++++++++++++++++++++++++++++++ main.py | 1 + ota.py | 391 +++++++++++++++++++++++++++++++ ota_config.example.json | 9 + ota_manifest.txt | 1 + 8 files changed, 1161 insertions(+) create mode 100644 boot.py create mode 100644 config.example.json create mode 100644 gauge_vid6008.py create mode 100644 gaugemqttcontinuous.py create mode 100644 main.py create mode 100644 ota.py create mode 100644 ota_config.example.json create mode 100644 ota_manifest.txt diff --git a/boot.py b/boot.py new file mode 100644 index 0000000..e2e57e2 --- /dev/null +++ b/boot.py @@ -0,0 +1,46 @@ +""" +boot.py — runs before main.py on every ESP32 boot + +Connects WiFi, runs OTA update, then hands off to main.py. +Keep this file as simple as possible — it is never OTA-updated itself +(it lives outside the repo folder) so bugs here require USB to fix. +""" + +import network +import utime +import sys + +import ota + +ota.load_config() +WIFI_SSID, WIFI_PASSWORD = ota.WIFI_SSID, ota.WIFI_PASSWORD + +def _connect_wifi(timeout_s=20): + sta = network.WLAN(network.STA_IF) + sta.active(True) + sta.config(txpower=15) + if sta.isconnected(): + return True + sta.connect(WIFI_SSID, WIFI_PASSWORD) + deadline = utime.time() + timeout_s + while not sta.isconnected(): + if utime.time() > deadline: + return False + utime.sleep_ms(300) + return True + +if WIFI_SSID is None: + print("[boot] No WiFi credentials — cannot connect, skipping OTA") +elif _connect_wifi(): + ip = network.WLAN(network.STA_IF).ifconfig()[0] + print(f"[boot] WiFi connected — {ip}") + + try: + ota.update() + except Exception as e: + print(f"[boot] OTA error: {e} — continuing with existing files") + sys.print_exception(e) +else: + print("[boot] WiFi failed — skipping OTA, booting with existing files") + +# main.py runs automatically after boot.py diff --git a/config.example.json b/config.example.json new file mode 100644 index 0000000..547e482 --- /dev/null +++ b/config.example.json @@ -0,0 +1,32 @@ +{ + "wifi_ssid": "YourWiFiSSID", + "wifi_password": "YourWiFiPassword", + "mqtt_broker": "mqtt.example.com", + "mqtt_port": 1883, + "mqtt_user": "mqtt_user", + "mqtt_password": "mqtt_password", + "mqtt_client_id": "selsyn1", + "mqtt_prefix": "home/panels/chernobyl1/gauge1", + "gauge_pins": [12, 13, 26, 27], + "gauge_min": 0, + "gauge_max": 7300, + "gauge_pulse": 4, + "gauge_half_step": false, + "smooth_step_ms": 50, + "heartbeat_ms": 10000, + "idle_release_ms": 3000, + "rezero_interval_ms": 3600000, + "led_red_pin": 33, + "led_green_pin": 32, + "led_bl_pin": 23, + "device_name": "Selsyn 1", + "device_model": "Chernobyl Selsyn-inspired gauge", + "device_manufacturer": "AdeBaumann", + "device_area": "Control Panels", + "gauge_entity_name": "Selsyn 1 Power", + "gauge_unit": "W", + "red_led_entity_name": "Selsyn 1 Red LED", + "green_led_entity_name": "Selsyn 1 Green LED", + "backlight_entity_name": "Selsyn 1 Backlight", + "backlight_unit": "%" +} diff --git a/gauge_vid6008.py b/gauge_vid6008.py new file mode 100644 index 0000000..47bd14f --- /dev/null +++ b/gauge_vid6008.py @@ -0,0 +1,187 @@ +""" +gauge_vid6008.py — Automotive stepper gauge driver for ESP32 / MicroPython + +Supports two driver modes: + + 1. STEP/DIR mode (e.g. VID-6008): 2 pins — DIR and STEP + Each step requires a pulse on STEP; DIR sets direction. + + 2. 4-PHASE mode (e.g. VID28/BKA30D/Switec X25): 4 pins — IN1..IN4 + Driven via two H-bridge pairs (ULN2003 or direct GPIO). +""" + +from machine import Pin +import utime + + +_TOTAL_STEPS = 3780 +_OVERRUN_STEPS = 200 + +_FULL_SEQUENCE = [ + (1, 0, 1, 0), + (0, 1, 1, 0), + (0, 1, 0, 1), + (1, 0, 0, 1), +] + + +class Gauge: + """ + Analog-style stepper gauge driver. + + Parameters + ---------- + pins : tuple[int, ...] + 2 pins for STEP/DIR mode: (DIR, STEP) + 4 pins for 4-phase mode: (IN1, IN2, IN3, IN4) + + mode : str + "stepdir" for VID-6008 style (2 pins), "4phase" for VID28/X25 (4 pins). + + min_val : float + Value that corresponds to the physical lower stop. + + max_val : float + Value that corresponds to the physical upper stop. + + step_us : int + Delay between steps in microseconds (default 200). + """ + + def __init__(self, pins, mode="4phase", min_val=0, max_val=100, step_us=200): + self._mode = mode + self._step_us = step_us + self._min_val = min_val + self._max_val = max_val + + if mode == "stepdir": + if len(pins) != 2: + raise ValueError("stepdir mode requires 2 pins: (DIR, STEP)") + self._pin_dir = Pin(pins[0], Pin.OUT) + self._pin_step = Pin(pins[1], Pin.OUT) + self._total_steps = _TOTAL_STEPS + self._phase = 0 + + elif mode == "4phase": + if len(pins) != 4: + raise ValueError("4phase mode requires 4 pins: (IN1, IN2, IN3, IN4)") + self._pins = [Pin(p, Pin.OUT) for p in pins] + self._total_steps = _TOTAL_STEPS + self._sequence = _FULL_SEQUENCE + self._phase = 0 + else: + raise ValueError("mode must be 'stepdir' or '4phase'") + + self._current_step = 0 + self._zeroed = False + + def zero(self): + overrun = _OVERRUN_STEPS + if self._mode == "stepdir": + self._pin_dir.value(0) + utime.sleep_us(10) + for _ in range(_TOTAL_STEPS + overrun): + self._pulse_step(500) + else: + for _ in range(_TOTAL_STEPS + overrun): + self._step(-1) + utime.sleep_ms(2) + + self._current_step = 0 + self._zeroed = True + self._release() + + def set(self, value, release=True): + if not self._zeroed: + raise RuntimeError("Call zero() before set()") + + value = max(self._min_val, min(self._max_val, value)) + target_step = self._val_to_step(value) + delta = target_step - self._current_step + + if delta == 0: + return + + if self._mode == "stepdir": + self._move_steps(delta) + else: + for _ in range(abs(delta)): + self._step(1 if delta > 0 else -1) + utime.sleep_ms(self._step_us) + + self._current_step = target_step + + if release: + self._release() + + def _move_steps(self, delta): + direction = 1 if delta > 0 else 0 + self._pin_dir.value(direction) + utime.sleep_us(10) + for _ in range(abs(delta)): + self._pulse_step(self._step_us) + + def _pulse_step(self, delay_us): + self._pin_step.value(1) + utime.sleep_us(2) + self._pin_step.value(0) + utime.sleep_us(delay_us) + + def get(self): + return self._step_to_val(self._current_step) + + def release(self): + self._release() + + def step(self, direction): + if self._mode == "stepdir": + self._pin_dir.value(1 if direction > 0 else 0) + utime.sleep_us(10) + self._pulse_step(self._step_us) + self._current_step += direction + else: + self._step(direction) + + def _step(self, direction): + steps_in_seq = len(self._sequence) + self._phase = (self._phase + direction) % steps_in_seq + seq = self._sequence[self._phase] + for pin, state in zip(self._pins, seq): + pin.value(state) + self._current_step += direction + + def _release(self): + if self._mode == "stepdir": + self._pin_step.value(0) + else: + for pin in self._pins: + pin.value(0) + + def _val_to_step(self, value): + frac = (value - self._min_val) / (self._max_val - self._min_val) + return int(round(frac * self._total_steps)) + + def _step_to_val(self, step): + frac = step / self._total_steps + return self._min_val + frac * (self._max_val - self._min_val) + + +if __name__ == "__main__": + g = Gauge( + pins=(12, 13), + mode="stepdir", + min_val=0, + max_val=100, + step_us=200, + ) + + print("Zeroing gauge...") + g.zero() + print("Zero complete.") + + for target in [25, 50, 75, 100, 50, 0]: + print("Moving to {} (currently at {:.1f})".format(target, g.get())) + g.set(target) + utime.sleep_ms(500) + + print("Done.") diff --git a/gaugemqttcontinuous.py b/gaugemqttcontinuous.py new file mode 100644 index 0000000..520039b --- /dev/null +++ b/gaugemqttcontinuous.py @@ -0,0 +1,494 @@ +""" +gaugemqtt.py — MQTT-based gauge controller for ESP32 / MicroPython + +Deploy these files to the ESP32: + gauge.py — stepper driver + gaugemqtt.py — this file + umqtt/simple.py — MicroPython built-in + umqtt/robust.py — https://raw.githubusercontent.com/micropython/micropython-lib/master/micropython/umqtt.robust/umqtt/robust.py + config.json — configuration (see below) + +MQTT topics (all prefixed with mqtt_prefix from config.json): + .../set ← HA publishes target value here + .../state → ESP32 publishes current value + .../status → ESP32 publishes online/offline + .../zero ← publish anything to trigger zero + .../led/red/set ← ON/OFF + .../led/green/set ← ON/OFF + .../led/backlight/set ← 0-100 + +Serial log format: [HH:MM:SS.mmm] LEVEL message +""" + +import network +import utime +import ujson +from umqtt.robust import MQTTClient +from machine import Pin +from neopixel import NeoPixel +from gauge_vid6008 import Gauge + +# --------------------------------------------------------------------------- +# Logging +# --------------------------------------------------------------------------- + +def _ts(): + ms = utime.ticks_ms() + return f"{(ms//3600000)%24:02d}:{(ms//60000)%60:02d}:{(ms//1000)%60:02d}.{ms%1000:03d}" + +def log(level, msg): print(f"[{_ts()}] {level:5s} {msg}") +def info(msg): log("INFO", msg) +def warn(msg): log("WARN", msg) +def log_err(msg): log("ERROR", msg) + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +def _load_config(): + try: + with open("/config.json") as f: + cfg = ujson.load(f) + info("Config loaded from /config.json") + return cfg + except OSError: + log_err("config.json not found — cannot continue") + raise + except Exception as e: + log_err(f"config.json parse error: {e} — cannot continue") + raise + +_cfg = _load_config() + +WIFI_SSID = _cfg["wifi_ssid"] +WIFI_PASSWORD = _cfg["wifi_password"] +MQTT_BROKER = _cfg["mqtt_broker"] +MQTT_PORT = int(_cfg.get("mqtt_port", 1883)) +MQTT_USER = _cfg["mqtt_user"] +MQTT_PASSWORD = _cfg["mqtt_password"] +MQTT_CLIENT_ID = _cfg["mqtt_client_id"] +MQTT_PREFIX = _cfg["mqtt_prefix"] +GAUGE_PINS = tuple(_cfg.get("gauge_pins", [12, 13])) +GAUGE_MODE = _cfg.get("gauge_mode", "stepdir") +GAUGE_MIN = float(_cfg.get("gauge_min", 0)) +GAUGE_MAX = float(_cfg.get("gauge_max", 7300)) +GAUGE_STEP_US = int(_cfg.get("gauge_step_us", 200)) + +MICROSTEPS_PER_SECOND = 600 # microsteps per second (adjustable) + +HEARTBEAT_MS = int(_cfg.get("heartbeat_ms", 10000)) +REZERO_INTERVAL_MS = int(_cfg.get("rezero_interval_ms", 3600000)) +LED_RED_PIN = int(_cfg.get("led_red_pin", 33)) +LED_GREEN_PIN = int(_cfg.get("led_green_pin", 32)) +LED_BL_PIN = int(_cfg.get("led_bl_pin", 23)) +DEVICE_NAME = _cfg.get("device_name", "Selsyn 1") +DEVICE_MODEL = _cfg.get("device_model", "Chernobyl Selsyn-inspired gauge") +DEVICE_MFR = _cfg.get("device_manufacturer", "AdeBaumann") +DEVICE_AREA = _cfg.get("device_area", "Control Panels") +GAUGE_ENTITY = _cfg.get("gauge_entity_name", "Selsyn 1 Power") +GAUGE_UNIT = _cfg.get("gauge_unit", "W") +RED_ENTITY = _cfg.get("red_led_entity_name", "Selsyn 1 Red LED") +GREEN_ENTITY = _cfg.get("green_led_entity_name", "Selsyn 1 Green LED") +BL_ENTITY = _cfg.get("backlight_entity_name", "Selsyn 1 Backlight") +BL_UNIT = _cfg.get("backlight_unit", "%") + +# --------------------------------------------------------------------------- +# Topics +# --------------------------------------------------------------------------- + +T_SET = f"{MQTT_PREFIX}/set" +T_STATE = f"{MQTT_PREFIX}/state" +T_STATUS = f"{MQTT_PREFIX}/status" +T_ZERO = f"{MQTT_PREFIX}/zero" +T_LED_RED = f"{MQTT_PREFIX}/led/red/set" +T_LED_GREEN = f"{MQTT_PREFIX}/led/green/set" +T_LED_BL = f"{MQTT_PREFIX}/led/backlight/set" +T_LED_BL_STATE = f"{MQTT_PREFIX}/led/backlight/state" +T_LED_RED_STATE = f"{MQTT_PREFIX}/led/red/state" +T_LED_GREEN_STATE = f"{MQTT_PREFIX}/led/green/state" + +T_DISC_GAUGE = f"homeassistant/number/{MQTT_CLIENT_ID}/config" +T_DISC_RED = f"homeassistant/switch/{MQTT_CLIENT_ID}_red/config" +T_DISC_GREEN = f"homeassistant/switch/{MQTT_CLIENT_ID}_green/config" +T_DISC_BL = f"homeassistant/light/{MQTT_CLIENT_ID}_bl/config" + +_DEVICE = { + "identifiers": [MQTT_CLIENT_ID], + "name": DEVICE_NAME, + "model": DEVICE_MODEL, + "manufacturer": DEVICE_MFR, + "suggested_area": DEVICE_AREA, +} + +# --------------------------------------------------------------------------- +# WiFi +# --------------------------------------------------------------------------- + +def connect_wifi(ssid, password, timeout_s=15): + sta = network.WLAN(network.STA_IF) + sta.active(True) + if sta.isconnected(): + ip, mask, gw, dns = sta.ifconfig() + info("WiFi already connected") + info(f" IP:{ip} mask:{mask} gw:{gw} dns:{dns}") + return ip + info(f"WiFi connecting to '{ssid}' ...") + sta.connect(ssid, password) + deadline = utime.time() + timeout_s + while not sta.isconnected(): + if utime.time() > deadline: + log_err(f"WiFi connect timeout after {timeout_s}s") + raise OSError("WiFi connect timeout") + utime.sleep_ms(200) + ip, mask, gw, dns = sta.ifconfig() + mac = ':'.join(f'{b:02x}' for b in sta.config('mac')) + info("WiFi connected!") + info(f" SSID : {ssid}") + info(f" MAC : {mac}") + info(f" IP : {ip} mask:{mask} gw:{gw} dns:{dns}") + return ip + +# --------------------------------------------------------------------------- +# Gauge +# --------------------------------------------------------------------------- + +info("Initialising gauge ...") +gauge = Gauge(pins=GAUGE_PINS, mode=GAUGE_MODE, min_val=GAUGE_MIN, max_val=GAUGE_MAX, step_us=GAUGE_STEP_US) +info(f"Gauge ready pins={GAUGE_PINS} mode={GAUGE_MODE} range=[{GAUGE_MIN}, {GAUGE_MAX}] step_us={GAUGE_STEP_US}") + +# --------------------------------------------------------------------------- +# LEDs +# --------------------------------------------------------------------------- + +led_red = Pin(LED_RED_PIN, Pin.OUT, value=0) +led_green = Pin(LED_GREEN_PIN, Pin.OUT, value=0) +led_bl = NeoPixel(Pin(LED_BL_PIN), 3) + +_backlight_color = (0, 0, 0) +_backlight_brightness = 100 # last *active* brightness — never set to 0 +_backlight_on = False +_bl_dirty_since = None +_BL_SAVE_DELAY_MS = 5000 + +def _flush_backlight(client): + payload = { + "state": "ON" if _backlight_on else "OFF", + "color": {"r": _backlight_color[0], "g": _backlight_color[1], "b": _backlight_color[2]}, + "brightness": int(_backlight_brightness * 2.55), + } + client.publish(T_LED_BL, ujson.dumps(payload), retain=True) + info(f"Backlight state retained: {payload['state']} {_backlight_color} @ {_backlight_brightness}%") + +def _backlight_changed(new_color, new_on, new_brightness): + """Return True if any backlight property differs from current state.""" + return (new_color != _backlight_color or + new_on != _backlight_on or + (new_on and new_brightness != _backlight_brightness)) + +def _mark_bl_dirty(): + global _bl_dirty_since + _bl_dirty_since = utime.ticks_ms() + +def set_backlight_color(r, g, b, brightness=None): + global _backlight_color, _backlight_brightness, _backlight_on + if brightness is None: + brightness = _backlight_brightness + new_on = brightness > 0 + if not _backlight_changed((r, g, b), new_on, brightness): + return + _backlight_color = (r, g, b) + if brightness > 0: + _backlight_brightness = brightness + _backlight_on = new_on + scale = brightness / 100 + for i in range(3): + led_bl[i] = (int(g * scale), int(r * scale), int(b * scale)) + led_bl.write() + _mark_bl_dirty() + +def set_backlight_brightness(brightness): + global _backlight_brightness, _backlight_on + clamped = max(0, min(100, brightness)) + new_on = clamped > 0 + if not _backlight_changed(_backlight_color, new_on, clamped): + return + if clamped > 0: + _backlight_brightness = clamped + _backlight_on = new_on + r, g, b = _backlight_color + scale = clamped / 100 + for i in range(3): + led_bl[i] = (int(g * scale), int(r * scale), int(b * scale)) + led_bl.write() + _mark_bl_dirty() + +info(f"LEDs ready red={LED_RED_PIN} green={LED_GREEN_PIN} backlight={LED_BL_PIN}") + +# --------------------------------------------------------------------------- +# State +# --------------------------------------------------------------------------- + +_target_value = GAUGE_MIN +_last_rezero_ms = None # set to ticks_ms() in main() +client_ref = None + +# --------------------------------------------------------------------------- +# MQTT callbacks +# --------------------------------------------------------------------------- + +def on_message(topic, payload): + if client_ref is None: + return + topic = topic.decode() + payload = payload.decode().strip() + info(f"MQTT rx {topic} {payload}") + + if topic == T_ZERO: + global _last_rezero_ms + info("Zero command received") + gauge.zero() + _last_rezero_ms = utime.ticks_ms() + info("Zero complete") + return + + if topic == T_LED_RED: + state = payload.upper() == "ON" + led_red.value(1 if state else 0) + client_ref.publish(T_LED_RED_STATE, "ON" if state else "OFF", retain=True) + info(f"Red LED → {'ON' if state else 'OFF'}") + return + + if topic == T_LED_GREEN: + state = payload.upper() == "ON" + led_green.value(1 if state else 0) + client_ref.publish(T_LED_GREEN_STATE, "ON" if state else "OFF", retain=True) + info(f"Green LED → {'ON' if state else 'OFF'}") + return + + if topic == T_LED_BL: + info(f"Backlight raw payload: '{payload}'") + try: + data = ujson.loads(payload) + info(f"Backlight parsed: state={data.get('state')} color={data.get('color')} brightness={data.get('brightness')}") + if data.get("state", "ON").upper() == "OFF": + set_backlight_brightness(0) + client_ref.publish(T_LED_BL_STATE, ujson.dumps({"state": "OFF"}), retain=True) + info("Backlight → OFF") + return + color = data.get("color", {}) + r = max(0, min(255, int(color.get("r", _backlight_color[0])))) + g = max(0, min(255, int(color.get("g", _backlight_color[1])))) + b = max(0, min(255, int(color.get("b", _backlight_color[2])))) + # HA sends brightness as 0-255; convert to 0-100 + raw_br = data.get("brightness", None) + if raw_br is not None: + brightness = max(0, min(100, round(int(raw_br) / 2.55))) + elif _backlight_brightness > 0: + brightness = _backlight_brightness + else: + brightness = 100 + except Exception as e: + warn(f"Invalid backlight payload: '{payload}' ({e})") + return + set_backlight_color(r, g, b, brightness) + color_hex = f"#{r:02x}{g:02x}{b:02x}" + state = {"state": "ON", "color_mode": "rgb", "brightness": int(brightness * 2.55), + "color": {"r": r, "g": g, "b": b}} + client_ref.publish(T_LED_BL_STATE, ujson.dumps(state), retain=True) + info(f"Backlight → {color_hex} @ {brightness}%") + return + + if topic == T_SET: + global _target_value + try: + _target_value = max(GAUGE_MIN, min(GAUGE_MAX, float(payload))) + info(f"New target → {_target_value:.1f}") + except ValueError: + warn(f"Invalid set value: '{payload}'") + +# --------------------------------------------------------------------------- +# MQTT connect + discovery +# --------------------------------------------------------------------------- + +def connect_mqtt(): + global client_ref + info(f"Connecting to MQTT broker {MQTT_BROKER}:{MQTT_PORT} ...") + client = MQTTClient( + client_id = MQTT_CLIENT_ID, + server = MQTT_BROKER, + port = MQTT_PORT, + user = MQTT_USER, + password = MQTT_PASSWORD, + keepalive = 60, + ) + client.set_last_will(T_STATUS, b"offline", retain=True, qos=0) + client.set_callback(on_message) + client.connect() + # Set client_ref AFTER connect so retained messages during subscribe + # are handled safely + client_ref = client + client.subscribe(T_SET) + client.subscribe(T_ZERO) + client.subscribe(T_LED_RED) + client.subscribe(T_LED_GREEN) + client.subscribe(T_LED_BL) + info(f"MQTT connected client_id={MQTT_CLIENT_ID}") + return client + +def publish_discovery(client): + """Publish all HA MQTT discovery payloads using short-form keys to stay under 512 bytes.""" + # Full device block only on first payload; subsequent use identifiers-only ref + _dev_ref = {"identifiers": [MQTT_CLIENT_ID]} + + client.publish(T_DISC_GAUGE, ujson.dumps({ + "name": GAUGE_ENTITY, + "unique_id": MQTT_CLIENT_ID, + "cmd_t": T_SET, + "stat_t": T_STATE, + "avty_t": T_STATUS, + "min": GAUGE_MIN, + "max": GAUGE_MAX, + "step": 1, + "unit_of_meas": GAUGE_UNIT, + "icon": "mdi:gauge", + "dev": _DEVICE, + }), retain=True) + info("Discovery: gauge") + + client.publish(T_DISC_RED, ujson.dumps({ + "name": RED_ENTITY, + "uniq_id": f"{MQTT_CLIENT_ID}_led_red", + "cmd_t": T_LED_RED, + "stat_t": T_LED_RED_STATE, + "pl_on": "ON", + "pl_off": "OFF", + "icon": "mdi:led-on", + "dev": _dev_ref, + "ret": True, + }), retain=True) + info("Discovery: red LED") + + client.publish(T_DISC_GREEN, ujson.dumps({ + "name": GREEN_ENTITY, + "uniq_id": f"{MQTT_CLIENT_ID}_led_green", + "cmd_t": T_LED_GREEN, + "stat_t": T_LED_GREEN_STATE, + "pl_on": "ON", + "pl_off": "OFF", + "icon": "mdi:led-on", + "dev": _dev_ref, + "ret": True, + }), retain=True) + info("Discovery: green LED") + + client.publish(T_DISC_BL, ujson.dumps({ + "name": BL_ENTITY, + "uniq_id": f"{MQTT_CLIENT_ID}_led_bl", + "cmd_t": T_LED_BL, + "stat_t": T_LED_BL_STATE, + "schema": "json", + "supported_color_modes": ["rgb"], + "icon": "mdi:led-strip", + "dev": _dev_ref, + "ret": True, + }), retain=True) + info("Discovery: backlight") + +def publish_state(client): + val = gauge.get() + client.publish(T_STATE, str(round(val, 1)), retain=True) + client.publish(T_STATUS, "online", retain=True) + info(f"State published value={val:.1f} step={gauge._current_step}") + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + info("=" * 48) + info("Gauge MQTT controller starting") + info("=" * 48) + + connect_wifi(WIFI_SSID, WIFI_PASSWORD) + + info("Zeroing gauge on startup ...") + gauge.zero() + info("Zero complete") + + # umqtt.robust handles reconnection automatically — just connect once + client = connect_mqtt() + publish_discovery(client) + publish_state(client) + + info("Entering main loop") + info("-" * 48) + + try: + import ota + ota.mark_ok() + info("OTA OK flag set") + except ImportError: + pass + + global _last_rezero_ms, _bl_dirty_since + last_heartbeat = utime.ticks_ms() + _last_rezero_ms = utime.ticks_ms() + + target_step = gauge._val_to_step(_target_value) + + # Period at which to publish state updates during movement + MOVE_STATE_INTERVAL_MS = 500 + last_move_state = utime.ticks_ms() + + while True: + try: + client.check_msg() + + now = utime.ticks_ms() + + # Continuously move towards target at constant speed + current_target = gauge._val_to_step(_target_value) + moved = False + if current_target != gauge._current_step: + direction = 1 if current_target > gauge._current_step else -1 + gauge.step(direction) + moved = True + + # Publish state during movement at intervals + if moved and utime.ticks_diff(now, last_move_state) >= MOVE_STATE_INTERVAL_MS: + publish_state(client) + last_move_state = now + + # Sleep to achieve constant speed + if moved or (current_target == gauge._current_step and gauge._current_step != gauge._val_to_step(_target_value)): + delay_us = 1_000_000 // MICROSTEPS_PER_SECOND + utime.sleep_us(delay_us) + + # Periodic auto-rezero (disabled when interval is 0) + if REZERO_INTERVAL_MS > 0 and utime.ticks_diff(now, _last_rezero_ms) >= REZERO_INTERVAL_MS: + info("Auto-rezero triggered") + saved = _target_value + gauge.zero() + if saved > GAUGE_MIN: + gauge.set(saved) + publish_state(client) + _last_rezero_ms = now + info(f"Auto-rezero complete, restored to {saved:.1f}") + + # Retain backlight state via MQTT after settling + if _bl_dirty_since is not None and utime.ticks_diff(now, _bl_dirty_since) >= _BL_SAVE_DELAY_MS: + _flush_backlight(client) + _bl_dirty_since = None + + # Heartbeat + if utime.ticks_diff(now, last_heartbeat) >= HEARTBEAT_MS: + publish_state(client) + info(f"step={gauge._current_step} phase={gauge._phase} target_step={gauge._val_to_step(_target_value)} expected_phase={gauge._current_step % 4}") + last_heartbeat = now + + except Exception as e: + log_err(f"Main loop error: {e} — continuing") + utime.sleep_ms(100) + +main() + diff --git a/main.py b/main.py new file mode 100644 index 0000000..2b0e49b --- /dev/null +++ b/main.py @@ -0,0 +1 @@ +import gaugemqttcontinuous diff --git a/ota.py b/ota.py new file mode 100644 index 0000000..7a36313 --- /dev/null +++ b/ota.py @@ -0,0 +1,391 @@ +""" +ota.py — Gitea OTA updater for ESP32 / MicroPython + +Call ota.update() from boot.py before importing anything else. +If the update or the subsequent boot fails, the updater retries +on the next boot rather than bricking the device. + +Strategy +-------- +1. Fetch ota_manifest.txt from the repo to determine which files to sync. +2. Compare SHA1 hashes with a local manifest (.ota_manifest.json). +3. Download only changed or missing files, writing to .tmp first. +4. On success, rename .tmp files into place and update the manifest. +5. If anything fails mid-update, the manifest is not updated, so the + next boot will retry. Partially written .tmp files are cleaned up. +6. A "safety" flag file (.ota_ok) is written by main.py on successful + startup. If it is absent on boot, the previous update is suspected + bad — the manifest is wiped so all files are re-fetched cleanly. + +Manifest format (ota_manifest.txt) +--------------------------------- +Each line specifies a file or directory to include: + boot.py # specific file + ota.py # another file + selsyn/ # entire directory (trailing slash) + lib/ # another directory + *.py # wildcard (matches anywhere) + selsyn/*.py # wildcard in subdirectory + +Usage in boot.py +---------------- + import ota + ota.update() + # imports of main etc. go here + +Configuration +------------- +Edit the block below, or override from a local config file +(see SETTINGS_FILE). All settings can be left as module-level +constants or placed in /ota_config.json: + { + "gitea_base": "http://git.baumann.gr", + "repo_owner": "adebaumann", + "repo_name": "HomeControlPanel", + "repo_folder": "firmware", + "repo_branch": "main", + "api_token": "nicetry-nothere" + } +""" + +import os +import gc +import sys +import ujson +import urequests +import utime + +# --------------------------------------------------------------------------- +# Default configuration — override via /ota_config.json +# --------------------------------------------------------------------------- + +GITEA_BASE = "http://git.baumann.gr" # no trailing slash +REPO_OWNER = "adrian" +REPO_NAME = "esp32-gauge" +REPO_FOLDER = "firmware" # folder inside repo to sync +REPO_BRANCH = "main" +API_TOKEN = None # set to string for private repos + +WIFI_SSID = None +WIFI_PASSWORD = None + +SETTINGS_FILE = "/ota_config.json" +MANIFEST_FILE = "/.ota_manifest.json" +OK_FLAG_FILE = "/.ota_ok" +OTA_MANIFEST = "ota_manifest.txt" + +# --------------------------------------------------------------------------- +# Logging +# --------------------------------------------------------------------------- + +def _ts(): + ms = utime.ticks_ms() + return f"{(ms//3600000)%24:02d}:{(ms//60000)%60:02d}:{(ms//1000)%60:02d}.{ms%1000:03d}" + +def _log(level, msg): print(f"[{_ts()}] {level:5s} [OTA] {msg}") +def info(msg): _log("INFO", msg) +def warn(msg): _log("WARN", msg) +def log_err(msg): _log("ERROR", msg) + +# --------------------------------------------------------------------------- +# Config loader +# --------------------------------------------------------------------------- + +def load_config(): + global GITEA_BASE, REPO_OWNER, REPO_NAME, REPO_FOLDER, REPO_BRANCH, API_TOKEN, WIFI_SSID, WIFI_PASSWORD + try: + with open(SETTINGS_FILE) as f: + cfg = ujson.load(f) + GITEA_BASE = cfg.get("gitea_base", GITEA_BASE) + REPO_OWNER = cfg.get("repo_owner", REPO_OWNER) + REPO_NAME = cfg.get("repo_name", REPO_NAME) + REPO_FOLDER = cfg.get("repo_folder", REPO_FOLDER) + REPO_BRANCH = cfg.get("repo_branch", REPO_BRANCH) + API_TOKEN = cfg.get("api_token", API_TOKEN) + WIFI_SSID = cfg.get("wifi_ssid", WIFI_SSID) + WIFI_PASSWORD = cfg.get("wifi_password", WIFI_PASSWORD) + info(f"Config loaded from {SETTINGS_FILE}") + except OSError: + info(f"No {SETTINGS_FILE} found — using defaults") + except Exception as e: + warn(f"Config parse error: {e} — using defaults") + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _match_pattern(name, pattern): + if "*" not in pattern: + return name == pattern + i, n = 0, len(pattern) + j, m = 0, len(name) + star = -1 + while i < n and j < m: + if pattern[i] == "*": + star = i + i += 1 + elif pattern[i] == name[j]: + i += 1 + j += 1 + elif star >= 0: + i = star + 1 + j += 1 + else: + return False + while i < n and pattern[i] == "*": + i += 1 + return i == n and j == m + +def _fetch_manifest(): + url = ( + f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}" + f"/contents/{OTA_MANIFEST}?ref={REPO_BRANCH}" + ) + try: + r = urequests.get(url, headers=_headers()) + if r.status_code == 200: + data = r.json() + r.close() + if data.get("content"): + import ubinascii + content = ubinascii.a2b_base64(data["content"]).decode() + patterns = [line.strip() for line in content.splitlines()] + return [p for p in patterns if p and not p.startswith("#")] + else: + warn(f"Manifest not found at {OTA_MANIFEST}") + r.close() + except Exception as e: + log_err(f"Failed to fetch manifest: {e}") + return None + +def _fetch_dir(path): + url = ( + f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}" + f"/contents/{path}?ref={REPO_BRANCH}" + ) + return _api_get(url) + +def _api_get(url): + """GET a URL and return parsed JSON, or None on failure.""" + try: + r = urequests.get(url, headers=_headers()) + if r.status_code == 200: + data = r.json() + r.close() + return data + warn(f"HTTP {r.status_code} for {url}") + r.close() + except Exception as e: + log_err(f"GET {url} failed: {e}") + return None + +def _download(url, dest_path): + """Download url to dest_path. Returns True on success.""" + tmp = dest_path + ".tmp" + try: + r = urequests.get(url, headers=_headers()) + if r.status_code != 200: + warn(f"Download failed HTTP {r.status_code}: {url}") + r.close() + return False + with open(tmp, "wb") as f: + f.write(r.content) + r.close() + # Rename into place + try: + os.remove(dest_path) + except OSError: + pass + os.rename(tmp, dest_path) + return True + except Exception as e: + log_err(f"Download error {url}: {e}") + try: + os.remove(tmp) + except OSError: + pass + return False + +def _load_manifest(): + try: + with open(MANIFEST_FILE) as f: + return ujson.load(f) + except Exception: + return {} + +def _save_manifest(manifest): + try: + with open(MANIFEST_FILE, "w") as f: + ujson.dump(manifest, f) + except Exception as e: + warn(f"Could not save manifest: {e}") + +def _wipe_manifest(): + try: + os.remove(MANIFEST_FILE) + info("Manifest wiped — full re-fetch on next update") + except OSError: + pass + +def _ok_flag_exists(): + try: + os.stat(OK_FLAG_FILE) + return True + except OSError: + return False + +def _clear_ok_flag(): + try: + os.remove(OK_FLAG_FILE) + except OSError: + pass + +def mark_ok(): + """ + Call this from main.py after successful startup. + Signals to the OTA updater that the last update was good. + """ + try: + with open(OK_FLAG_FILE, "w") as f: + f.write("ok") + except Exception as e: + warn(f"Could not write OK flag: {e}") + +# --------------------------------------------------------------------------- +# Core update logic +# --------------------------------------------------------------------------- + +def _fetch_file_list(): + """ + Returns list of {name, sha, download_url} dicts based on the + ota_manifest.txt patterns in the repo folder, or None on failure. + """ + manifest_patterns = _fetch_manifest() + if manifest_patterns is None: + log_err("No manifest — cannot determine what to fetch") + return None + + info(f"Manifest patterns: {manifest_patterns}") + files = [] + visited = set() + + def fetch_matching(entries, patterns): + for entry in entries: + if entry.get("type") == "dir": + for p in patterns: + if p.endswith("/") and entry["name"].startswith(p.rstrip("/")): + sub = _fetch_dir(entry["path"]) + if sub: + fetch_matching(sub, patterns) + break + else: + name = entry["name"] + if not name.endswith(".py"): + continue + for p in patterns: + p = p.rstrip("/") + if _match_pattern(name, p) or _match_pattern(entry["path"], p): + if entry["path"] not in visited: + visited.add(entry["path"]) + files.append({ + "name": entry["path"], + "sha": entry["sha"], + "download_url": entry["download_url"], + }) + break + + root = _fetch_dir(REPO_FOLDER) + if root is None: + return None + + fetch_matching(root, manifest_patterns) + return files + +def _do_update(): + """ + Fetch file list, download changed files, update manifest. + Returns True if all succeeded (or nothing needed updating). + """ + info(f"Checking {GITEA_BASE}/{REPO_OWNER}/{REPO_NAME}/{REPO_FOLDER} @ {REPO_BRANCH}") + file_list = _fetch_file_list() + if file_list is None: + log_err("Could not fetch file list — skipping update") + return False + + info(f"Found {len(file_list)} file(s) to sync") + manifest = _load_manifest() + updated = [] + failed = [] + + for entry in file_list: + name = entry["name"] + sha = entry["sha"] + + if manifest.get(name) == sha: + info(f" {name} up to date") + continue + + info(f" {name} updating (sha={sha[:8]}...)") + gc.collect() + ok = _download(entry["download_url"], f"/{name}") + if ok: + manifest[name] = sha + updated.append(name) + info(f" {name} OK") + else: + failed.append(name) + log_err(f" {name} FAILED") + + if failed: + log_err(f"Update incomplete — {len(failed)} file(s) failed: {failed}") + # Save partial manifest so successful files aren't re-downloaded + _save_manifest(manifest) + return False + + _save_manifest(manifest) + + if updated: + info(f"Update complete — {len(updated)} file(s) updated: {updated}") + else: + info("All files up to date — nothing to do") + + return True + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + +def update(): + """ + Main entry point. Call from boot.py before importing application code. + + - If the OK flag is missing, the previous boot is assumed to have + failed — wipes the manifest so everything is re-fetched cleanly. + - Runs the update. + - Clears the OK flag so main.py must re-assert it on successful start. + """ + info("=" * 40) + info("OTA updater starting") + info("=" * 40) + + load_config() + + if not _ok_flag_exists(): + warn("OK flag missing — last boot may have failed") + warn("Wiping manifest to force full re-fetch") + _wipe_manifest() + else: + info("OK flag present — last boot was good") + + # Clear the flag now; main.py must call ota.mark_ok() to re-set it + _clear_ok_flag() + + success = _do_update() + + if success: + info("OTA check complete — booting application") + else: + warn("OTA check had errors — booting with current files") + + info("-" * 40) + gc.collect() diff --git a/ota_config.example.json b/ota_config.example.json new file mode 100644 index 0000000..c98637a --- /dev/null +++ b/ota_config.example.json @@ -0,0 +1,9 @@ +{ + "gitea_base": "http://git.baumann.gr", + "repo_owner": "adebaumann", + "repo_name": "Selsyn_inspired_gauge", + "repo_folder": "", + "repo_branch": "main", + "wifi_ssid": "YourNetwork", + "wifi_password": "YourPassword" +} diff --git a/ota_manifest.txt b/ota_manifest.txt new file mode 100644 index 0000000..f104652 --- /dev/null +++ b/ota_manifest.txt @@ -0,0 +1 @@ +*.py