3 Commits
dev ... wrover

Author SHA1 Message Date
6209cb0b2d Troubleshooting 2026-04-12 02:42:07 +02:00
179f202dfe Troubleshooting 2026-04-12 02:38:14 +02:00
906922357d Troubleshooting 2026-04-12 02:24:36 +02:00
13 changed files with 227 additions and 211 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -28,5 +28,5 @@
"red_led_entity_name": "Selsyn 1 Red LED",
"green_led_entity_name": "Selsyn 1 Green LED",
"backlight_entity_name": "Selsyn 1 Backlight",
"ws2812_order": "GRB"
"backlight_unit": "%"
}

View File

@@ -47,7 +47,6 @@
"pin": 23,
"num_leds_per_gauge": 3,
"num_status_leds_per_gauge": 2
"ws2812_order": "GRB"
},
"device": {

View File

@@ -75,7 +75,6 @@ class Gauge:
self._current_step = 0
self._zeroed = False
self._last_dir = None
def zero(self):
overrun = _OVERRUN_STEPS
@@ -109,7 +108,7 @@ class Gauge:
else:
for _ in range(abs(delta)):
self._step(1 if delta > 0 else -1)
utime.sleep_us(self._step_us)
utime.sleep_ms(self._step_us)
self._current_step = target_step
@@ -129,20 +128,6 @@ class Gauge:
self._pin_step.value(0)
utime.sleep_us(delay_us)
def steps_toward(self, value, limit=5, deadband=0.5):
"""Return the step delta needed to move toward value, clamped to ±limit.
deadband: If error is less than this fraction of one step, return 0 to prevent
micro-corrections due to floating-point rounding. Default 0.5 means
no movement if error < half a step.
"""
target_step = self._val_to_step(value)
delta = target_step - self._current_step
deadband_steps = deadband
if abs(delta) < deadband_steps:
return 0
return max(-limit, min(limit, delta))
def get(self):
return self._step_to_val(self._current_step)

View File

@@ -1,12 +1,12 @@
"""
gaugemqttcontinuous.py — MQTT-based gauge controller for ESP32 / MicroPython
gaugemqtt.py — MQTT-based gauge controller for ESP32 / MicroPython
Deploy these files to the ESP32:
gauge_vid6008.py — stepper driver
gaugemqttcontinuous.py — this file
umqtt/simple.py — MicroPython built-in
umqtt/robust.py — https://raw.githubusercontent.com/micropython/micropython-lib/master/micropython/umqtt.robust/umqtt/robust.py
config.json — configuration (see below)
gauge.py — stepper driver
gaugemqtt.py — this file
umqtt/simple.py — MicroPython built-in
umqtt/robust.py — https://raw.githubusercontent.com/micropython/micropython-lib/master/micropython/umqtt.robust/umqtt/robust.py
config.json — configuration (see below)
MQTT topics (all prefixed with mqtt_prefix from config.json):
.../set ← HA publishes target value here
@@ -81,7 +81,6 @@ _cfg = _load_config()
DEBUG = _cfg.get("debug", False)
_DEBUG = DEBUG
_WS2812_ORDER = _cfg.get("ws2812_order", "GRB").upper()
WIFI_SSID = _cfg["wifi_ssid"]
WIFI_PASSWORD = _cfg["wifi_password"]
@@ -150,11 +149,13 @@ else:
"ws2812_green": tuple(_cfg.get("ws2812_green", [0, 255, 0])),
}
)
BL_UNIT = _cfg.get("backlight_unit", "%")
# ---------------------------------------------------------------------------
# Gauge initialization
# ---------------------------------------------------------------------------
info("Initialising gauge objects...")
gauge_objects = []
for g in gauges:
gauge_objects.append(
@@ -169,6 +170,7 @@ for g in gauges:
info(
f"Gauge {g['id']}: {g['name']} pins={g['pins']} mode={g['mode']} range=[{g['min']}, {g['max']}]"
)
info("Gauge objects done")
gauge_targets = [g["min"] for g in gauges] # target value per gauge
gauge_last_rezero = [utime.ticks_ms() for _ in gauges]
@@ -205,10 +207,41 @@ def make_gauge_topics(prefix, gauge_id):
gauge_topics = [make_gauge_topics(MQTT_PREFIX, g["id"]) for g in gauges]
T_SET = f"{MQTT_PREFIX}/set"
T_STATE = f"{MQTT_PREFIX}/state"
T_STATUS = f"{MQTT_PREFIX}/status"
T_ZERO = f"{MQTT_PREFIX}/zero"
T_DISC_GAUGE = f"homeassistant/number/{MQTT_CLIENT_ID}/config"
T_DISC_RED = f"homeassistant/switch/{MQTT_CLIENT_ID}_red/config"
T_DISC_GREEN = f"homeassistant/switch/{MQTT_CLIENT_ID}_green/config"
T_DISC_BL = f"homeassistant/light/{MQTT_CLIENT_ID}_bl/config"
def make_gauge_topics(prefix, gauge_id):
return {
"set": f"{prefix}/gauge{gauge_id}/set",
"state": f"{prefix}/gauge{gauge_id}/state",
"status": f"{prefix}/gauge{gauge_id}/status",
"zero": f"{prefix}/gauge{gauge_id}/zero",
"disc": f"homeassistant/number/{MQTT_CLIENT_ID}_g{gauge_id}/config",
"led_red": f"{prefix}/gauge{gauge_id}/led/red/set",
"led_green": f"{prefix}/gauge{gauge_id}/led/green/set",
"led_bl": f"{prefix}/gauge{gauge_id}/led/backlight/set",
"led_red_state": f"{prefix}/gauge{gauge_id}/led/red/state",
"led_green_state": f"{prefix}/gauge{gauge_id}/led/green/state",
"led_bl_state": f"{prefix}/gauge{gauge_id}/led/backlight/state",
"led_red_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_red/config",
"led_green_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_green/config",
"led_bl_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_bl/config",
"status_red": f"{prefix}/gauge{gauge_id}/status_led/red/set",
"status_green": f"{prefix}/gauge{gauge_id}/status_led/green/set",
"status_red_state": f"{prefix}/gauge{gauge_id}/status_led/red/state",
"status_green_state": f"{prefix}/gauge{gauge_id}/status_led/green/state",
"status_red_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_status_red/config",
"status_green_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_status_green/config",
}
_DEVICE = {
"identifiers": [MQTT_CLIENT_ID],
@@ -222,10 +255,12 @@ _DEVICE = {
# WiFi
# ---------------------------------------------------------------------------
_wifi_reconnect_delay_s = 5
_wifi_check_interval_ms = 30000
_last_wifi_check = 0
_wifi_sta = None
def connect_wifi(ssid, password, timeout_s=15):
global _wifi_sta
_wifi_sta = network.WLAN(network.STA_IF)
@@ -253,7 +288,7 @@ def connect_wifi(ssid, password, timeout_s=15):
def check_wifi():
global _wifi_sta, _last_wifi_check
global _wifi_sta, _last_wifi_check, _wifi_reconnect_delay_s
now = utime.ticks_ms()
if utime.ticks_diff(now, _last_wifi_check) < _wifi_check_interval_ms:
return
@@ -281,15 +316,27 @@ def check_wifi():
log_err(f"WiFi reconnect failed: {e}")
# ---------------------------------------------------------------------------
# LEDs (per gauge)
# ---------------------------------------------------------------------------
info("Initialising LEDs...")
num_gauges = len(gauges)
leds_red = []
leds_green = []
leds_bl = []
for g in gauges:
leds_red.append(Pin(g["red_pin"], Pin.OUT, value=0))
leds_green.append(Pin(g["green_pin"], Pin.OUT, value=0))
total_backlight_leds = num_gauges * (BACKLIGHT_LEDS_PER_GAUGE + STATUS_LEDS_PER_GAUGE)
leds_bl = NeoPixel(Pin(BACKLIGHT_PIN), total_backlight_leds)
info(f"Total backlight LEDs: {total_backlight_leds}")
leds_bl = (
NeoPixel(Pin(BACKLIGHT_PIN), total_backlight_leds)
if total_backlight_leds > 0
else None
)
info("LEDs done")
backlight_color = [(0, 0, 0) for _ in range(num_gauges)]
backlight_brightness = [100 for _ in range(num_gauges)]
@@ -302,11 +349,22 @@ _bl_dirty_since = None
_BL_SAVE_DELAY_MS = 5000
def _to_pixel(r, g, b):
"""Reorder RGB to match the WS2812 variant's byte order (GRB or RGB)."""
if _WS2812_ORDER == "GRB":
return (g, r, b)
return (r, g, b)
def _flush_backlight(client):
for i in range(num_gauges):
gt = gauge_topics[i]
payload = {
"state": "ON" if backlight_on[i] else "OFF",
"color": {
"r": backlight_color[i][0],
"g": backlight_color[i][1],
"b": backlight_color[i][2],
},
"brightness": int(backlight_brightness[i] * 2.55),
}
client.publish(gt["led_bl_state"], ujson.dumps(payload), retain=True)
info(
f"Gauge {i} backlight: {payload['state']} {backlight_color[i]} @ {backlight_brightness[i]}%"
)
def _backlight_changed(gauge_idx, new_color, new_on, new_brightness):
@@ -322,19 +380,6 @@ def _mark_bl_dirty():
_bl_dirty_since = utime.ticks_ms()
def _apply_backlight(gauge_idx, r, g, b, brightness):
"""Write RGB+brightness to the physical LEDs and mark dirty."""
scale = brightness / 100
leds_per_gauge = BACKLIGHT_LEDS_PER_GAUGE + STATUS_LEDS_PER_GAUGE
base_idx = gauge_idx * leds_per_gauge
pixel = _to_pixel(int(r * scale), int(g * scale), int(b * scale))
for j in range(BACKLIGHT_LEDS_PER_GAUGE):
leds_bl[base_idx + j] = pixel
_update_status_leds(gauge_idx)
leds_bl.write()
_mark_bl_dirty()
def set_backlight_color(gauge_idx, r, g, b, brightness=None):
global backlight_color, backlight_brightness, backlight_on
if brightness is None:
@@ -346,7 +391,17 @@ def set_backlight_color(gauge_idx, r, g, b, brightness=None):
if brightness > 0:
backlight_brightness[gauge_idx] = brightness
backlight_on[gauge_idx] = new_on
_apply_backlight(gauge_idx, r, g, b, brightness)
scale = brightness / 100
leds_per_gauge = BACKLIGHT_LEDS_PER_GAUGE + STATUS_LEDS_PER_GAUGE
base_idx = gauge_idx * leds_per_gauge
for j in range(BACKLIGHT_LEDS_PER_GAUGE):
if leds_bl:
leds_bl[base_idx + j] = (int(g * scale), int(r * scale), int(b * scale))
_update_status_leds(gauge_idx)
if leds_bl:
leds_bl.write()
_mark_bl_dirty()
def set_backlight_brightness(gauge_idx, brightness):
@@ -359,10 +414,21 @@ def set_backlight_brightness(gauge_idx, brightness):
backlight_brightness[gauge_idx] = clamped
backlight_on[gauge_idx] = new_on
r, g, b = backlight_color[gauge_idx]
_apply_backlight(gauge_idx, r, g, b, clamped)
scale = clamped / 100
leds_per_gauge = BACKLIGHT_LEDS_PER_GAUGE + STATUS_LEDS_PER_GAUGE
base_idx = gauge_idx * leds_per_gauge
for j in range(BACKLIGHT_LEDS_PER_GAUGE):
if leds_bl:
leds_bl[base_idx + j] = (int(g * scale), int(r * scale), int(b * scale))
_update_status_leds(gauge_idx)
if leds_bl:
leds_bl.write()
_mark_bl_dirty()
def _update_status_leds(gauge_idx):
if not leds_bl:
return
leds_per_gauge = BACKLIGHT_LEDS_PER_GAUGE + STATUS_LEDS_PER_GAUGE
base_idx = gauge_idx * leds_per_gauge + BACKLIGHT_LEDS_PER_GAUGE
@@ -371,12 +437,12 @@ def _update_status_leds(gauge_idx):
green_color = g_cfg["ws2812_green"]
if status_led_red[gauge_idx]:
leds_bl[base_idx] = _to_pixel(*red_color)
leds_bl[base_idx] = (red_color[1], red_color[0], red_color[2])
else:
leds_bl[base_idx] = (0, 0, 0)
if status_led_green[gauge_idx]:
leds_bl[base_idx + 1] = _to_pixel(*green_color)
leds_bl[base_idx + 1] = (green_color[1], green_color[0], green_color[2])
else:
leds_bl[base_idx + 1] = (0, 0, 0)
@@ -388,38 +454,18 @@ def set_status_led(gauge_idx, led_type, state):
elif led_type == "green":
status_led_green[gauge_idx] = state
_update_status_leds(gauge_idx)
leds_bl.write()
if leds_bl:
leds_bl.write()
def publish_backlight_states(client):
"""Publish current backlight state for all gauges as retained MQTT messages."""
for i in range(num_gauges):
gt = gauge_topics[i]
r, g, b = backlight_color[i]
brightness = backlight_brightness[i]
state = {
"state": "ON" if backlight_on[i] else "OFF",
"color_mode": "rgb",
"brightness": int(brightness * 2.55),
"color": {"r": r, "g": g, "b": b},
}
try:
client.publish(gt["led_bl_state"], ujson.dumps(state), retain=True)
except Exception as e:
log_err(f"Backlight state publish failed for gauge {i}: {e}")
# ---------------------------------------------------------------------------
# State
# ---------------------------------------------------------------------------
def _flush_backlight_state():
global _bl_dirty_since
if _bl_dirty_since is None:
return
if utime.ticks_diff(utime.ticks_ms(), _bl_dirty_since) < _BL_SAVE_DELAY_MS:
return
if client_ref is None:
return
publish_backlight_states(client_ref)
_bl_dirty_since = None
info("Backlight state flushed to MQTT")
_last_rezero_ms = None # set to ticks_ms() in main()
client_ref = None
_mqtt_connected = False
_last_mqtt_check = 0
def _publish(topic, payload, retain=False):
@@ -463,6 +509,15 @@ def on_message(topic, payload):
warn(f"Invalid set value for gauge {i}: '{payload}'")
return
if topic == T_ZERO:
for i, g in enumerate(gauge_objects):
info(f"Zeroing all gauges")
g.zero()
gauge_last_rezero[i] = utime.ticks_ms()
info("All gauges zeroed")
return
for i, gt in enumerate(gauge_topics):
if topic == gt["led_red"]:
state = payload.upper() == "ON"
leds_red[i].value(1 if state else 0)
@@ -526,59 +581,12 @@ def on_message(topic, payload):
info(f"Gauge {i} status green → {'ON' if state else 'OFF'}")
return
if topic == T_ZERO:
for i, g in enumerate(gauge_objects):
g.zero()
gauge_last_rezero[i] = utime.ticks_ms()
info("All gauges zeroed")
return
if topic == T_SET:
try:
data = ujson.loads(payload)
if isinstance(data, dict):
for i, val in enumerate(data.values()):
if i < len(gauges):
g = gauges[i]
gauge_targets[i] = max(g["min"], min(g["max"], float(val)))
info(f"Gauge {i} target → {gauge_targets[i]:.1f}")
else:
val = float(payload)
for i in range(len(gauges)):
gauge_targets[i] = max(gauges[i]["min"], min(gauges[i]["max"], val))
info(f"All gauges target → {val:.1f}")
except Exception:
try:
val = float(payload)
for i in range(len(gauges)):
gauge_targets[i] = max(gauges[i]["min"], min(gauges[i]["max"], val))
info(f"All gauges target → {val:.1f}")
except:
warn(f"Invalid set value: '{payload}'")
return
# ---------------------------------------------------------------------------
# MQTT connect + discovery
# ---------------------------------------------------------------------------
def _subscribe_all(c):
c.subscribe(f"{MQTT_PREFIX}/set")
c.subscribe(f"{MQTT_PREFIX}/zero")
for i in range(num_gauges):
prefix = f"{MQTT_PREFIX}/gauge{i}"
c.subscribe(f"{prefix}/set")
c.subscribe(f"{prefix}/zero")
c.subscribe(f"{prefix}/led/red/set")
c.subscribe(f"{prefix}/led/green/set")
c.subscribe(f"{prefix}/led/backlight/set")
c.subscribe(f"{prefix}/status_led/red/set")
c.subscribe(f"{prefix}/status_led/green/set")
def connect_mqtt():
global client_ref, _mqtt_connected
info(f"Connecting to MQTT broker {MQTT_BROKER}:{MQTT_PORT} ...")
@@ -588,22 +596,29 @@ def connect_mqtt():
port=MQTT_PORT,
user=MQTT_USER,
password=MQTT_PASSWORD,
keepalive=30,
keepalive=60,
)
# Don't set last will - it might be causing issues
# client.set_last_will(T_STATUS, b"offline", retain=True, qos=0)
client.set_last_will(gauge_topics[0]["status"], b"offline", retain=True, qos=0)
client.set_callback(on_message)
client.connect()
client_ref = client
client.subscribe(T_SET)
client.subscribe(T_ZERO)
for gt in gauge_topics:
client.subscribe(gt["set"])
client.subscribe(gt["zero"])
client.subscribe(gt["led_red"])
client.subscribe(gt["led_green"])
client.subscribe(gt["led_bl"])
client.subscribe(gt["status_red"])
client.subscribe(gt["status_green"])
_mqtt_connected = True
info(f"MQTT connected client_id={MQTT_CLIENT_ID}")
return client
_mqtt_check_interval_ms = 30000
_last_mqtt_check = 0
client_ref = None
_mqtt_connected = False
def check_mqtt():
@@ -634,16 +649,27 @@ def check_mqtt():
port=MQTT_PORT,
user=MQTT_USER,
password=MQTT_PASSWORD,
keepalive=30,
keepalive=60,
)
client_ref.set_last_will(
gauge_topics[0]["status"], b"offline", retain=True, qos=0
)
client_ref.set_callback(on_message)
client_ref.connect()
client_ref.subscribe(T_SET)
client_ref.subscribe(T_ZERO)
for gt in gauge_topics:
client_ref.subscribe(gt["set"])
client_ref.subscribe(gt["zero"])
client_ref.subscribe(gt["led_red"])
client_ref.subscribe(gt["led_green"])
client_ref.subscribe(gt["led_bl"])
client_ref.subscribe(gt["status_red"])
client_ref.subscribe(gt["status_green"])
_mqtt_connected = True
info("MQTT reconnected!")
publish_discovery(client_ref)
_subscribe_all(client_ref)
publish_state(client_ref)
publish_backlight_states(client_ref)
return True
except Exception as e2:
log_err(f"MQTT reconnect attempt {attempt + 1} failed: {e2}")
@@ -653,10 +679,15 @@ def check_mqtt():
return False
def publish_discovery(client):
"""Publish all HA MQTT discovery payloads for gauges and LEDs."""
_dev_ref = _DEVICE
_dev_ref = {
"identifiers": [MQTT_CLIENT_ID],
"name": DEVICE_NAME,
"model": DEVICE_MODEL,
"manufacturer": DEVICE_MFR,
"suggested_area": DEVICE_AREA,
}
for i, g in enumerate(gauges):
gt = gauge_topics[i]
@@ -682,11 +713,6 @@ def publish_discovery(client):
)
info(f"Discovery: gauge {i} ({g['name']})")
# Process MQTT messages between gauges
for _ in range(5):
client.check_msg()
utime.sleep_ms(10)
client.publish(
gt["led_red_disc"],
ujson.dumps(
@@ -725,11 +751,6 @@ def publish_discovery(client):
)
info(f"Discovery: gauge {i} green LED")
# Process MQTT messages
for _ in range(5):
client.check_msg()
utime.sleep_ms(10)
client.publish(
gt["led_bl_disc"],
ujson.dumps(
@@ -787,17 +808,14 @@ def publish_discovery(client):
)
info(f"Discovery: gauge {i} status green")
# Process between gauges to avoid MQTT blocking
for _ in range(5):
client.check_msg()
utime.sleep_ms(10)
def publish_state(client):
for i, g in enumerate(gauge_objects):
gt = gauge_topics[i]
val = g.get()
client.publish(gt["state"], str(val))
client.publish(gt["state"], str(round(val, 1)), retain=True)
client.publish(gt["status"], "online", retain=True)
info(f"Gauge {i} state: {val:.1f} step={g._current_step}")
# ---------------------------------------------------------------------------
@@ -806,58 +824,47 @@ def publish_state(client):
def main():
utime.sleep_ms(0)
info("=" * 48)
info("Gauge MQTT controller starting")
info("=" * 48)
info("Connecting WiFi...")
connect_wifi(WIFI_SSID, WIFI_PASSWORD)
info("WiFi done")
# Connect MQTT (no subscriptions yet — keeps broker silent during discovery)
connect_mqtt()
# Publish discovery — broker has nothing to send back yet
info("Publishing discovery...")
publish_discovery(client_ref)
# Subscribe now — retained messages will start arriving from here
_subscribe_all(client_ref)
info("Draining initial retained messages...")
for _ in range(50):
client_ref.check_msg()
utime.sleep_ms(20)
# Now initialize gauges
info("Zeroing gauges on startup ...")
for i, g in enumerate(gauge_objects):
g.zero()
info(f"Zeroed gauge {i}")
info("Zero complete")
info("Zero done")
info("Publishing state...")
info("Connecting MQTT...")
connect_mqtt()
info("MQTT done")
info("Publishing discovery...")
publish_discovery(client_ref)
publish_state(client_ref)
utime.sleep_ms(50)
for _ in range(5):
client_ref.check_msg()
utime.sleep_ms(20)
info("Discovery done")
info("Entering main loop")
info("-" * 48)
try:
import ota
ota.mark_ok()
except:
info("OTA OK flag set")
except ImportError:
pass
# Initialize variables for main loop
global _bl_dirty_since
last_heartbeat = utime.ticks_ms()
now = 0
was_moving = False
while True:
utime.sleep_ms(0)
try:
now = utime.ticks_ms()
check_wifi()
if not check_mqtt():
@@ -865,33 +872,50 @@ def main():
continue
client_ref.check_msg()
_flush_backlight_state()
pending = [g.steps_toward(gauge_targets[i],limit=50) for i, g in enumerate(gauge_objects)]
now = utime.ticks_ms()
moved_any = False
for i, g in enumerate(gauge_objects):
current_target = g._val_to_step(gauge_targets[i])
if current_target != g._current_step:
direction = 1 if current_target > g._current_step else -1
steps_to_move = current_target - g._current_step
steps_to_move = max(-5, min(5, steps_to_move))
for _ in range(abs(steps_to_move)):
g.step(direction)
moved_any = True
moved_any = any(s != 0 for s in pending)
if moved_any:
was_moving = True
delay_us = 1_000_000 // MICROSTEPS_PER_SECOND
for tick in range(max(abs(s) for s in pending)):
for i, g in enumerate(gauge_objects):
if tick < abs(pending[i]):
g.step(1 if pending[i] > 0 else -1)
utime.sleep_us(delay_us)
else:
if was_moving:
publish_state(client_ref)
was_moving = False
utime.sleep_ms(10)
utime.sleep_us(delay_us)
if (
REZERO_INTERVAL_MS > 0
and utime.ticks_diff(now, gauge_last_rezero[0]) >= REZERO_INTERVAL_MS
):
for i, g in enumerate(gauge_objects):
info(f"Auto-rezero gauge {i}")
saved = gauge_targets[i]
g.zero()
if saved > gauges[i]["min"]:
g.set(saved)
gauge_last_rezero[i] = now
publish_state(client_ref)
info("Auto-rezero complete")
if (
_bl_dirty_since is not None
and utime.ticks_diff(now, _bl_dirty_since) >= _BL_SAVE_DELAY_MS
):
_flush_backlight(client_ref)
_bl_dirty_since = None
if utime.ticks_diff(now, last_heartbeat) >= HEARTBEAT_MS:
info(f"Heartbeat: {gauge_targets}")
publish_state(client_ref)
last_heartbeat = now
except Exception as e:
import sys
sys.print_exception(e)
log_err(f"Main loop error: {e} — continuing")
utime.sleep_ms(100)

32
ota.py
View File

@@ -198,19 +198,18 @@ def _fetch_manifest():
)
try:
r = urequests.get(url, headers=_headers())
try:
if r.status_code == 200:
data = r.json()
if data.get("content"):
import ubinascii
content = ubinascii.a2b_base64(data["content"]).decode()
patterns = [line.strip() for line in content.splitlines()]
return [p for p in patterns if p and not p.startswith("#")]
else:
warn(f"Manifest not found at {OTA_MANIFEST}")
finally:
if r.status_code == 200:
data = r.json()
r.close()
if data.get("content"):
import ubinascii
content = ubinascii.a2b_base64(data["content"]).decode()
patterns = [line.strip() for line in content.splitlines()]
return [p for p in patterns if p and not p.startswith("#")]
else:
warn(f"Manifest not found at {OTA_MANIFEST}")
r.close()
except Exception as e:
log_err(f"Failed to fetch manifest: {e}")
return None
@@ -285,6 +284,13 @@ def _save_manifest(manifest, commit_sha=None):
warn(f"Could not save manifest: {e}")
def _wipe_manifest():
try:
os.remove(MANIFEST_FILE)
info("Manifest wiped — full re-fetch on next update")
except OSError:
pass
def _ok_flag_exists():
try:
@@ -343,6 +349,8 @@ def _fetch_file_list():
break
else:
name = entry["name"]
if not name.endswith(".py"):
continue
for p in patterns:
p = p.rstrip("/")
if _match_pattern(name, p) or _match_pattern(entry["path"], p):