8 Commits
main ... wrover

5 changed files with 239 additions and 71 deletions

BIN
3d_model/Box for one.stl Normal file

Binary file not shown.

BIN
3d_model/case_test.stl Normal file

Binary file not shown.

View File

@@ -20,7 +20,9 @@
"unit": "W", "unit": "W",
"leds": { "leds": {
"red_pin": 33, "red_pin": 33,
"green_pin": 32 "green_pin": 32,
"ws2812_red": [255, 0, 0],
"ws2812_green": [0, 255, 0]
} }
}, },
{ {
@@ -34,14 +36,17 @@
"unit": "C", "unit": "C",
"leds": { "leds": {
"red_pin": 21, "red_pin": 21,
"green_pin": 20 "green_pin": 20,
"ws2812_red": [255, 0, 0],
"ws2812_green": [0, 255, 0]
} }
} }
], ],
"backlight": { "backlight": {
"pin": 23, "pin": 23,
"num_leds_per_gauge": 3 "num_leds_per_gauge": 3,
"num_status_leds_per_gauge": 2
}, },
"device": { "device": {

View File

@@ -98,6 +98,7 @@ REZERO_INTERVAL_MS = int(_cfg.get("rezero_interval_ms", 3600000))
backlight_cfg = _cfg.get("backlight", {}) backlight_cfg = _cfg.get("backlight", {})
BACKLIGHT_PIN = int(backlight_cfg.get("pin", _cfg.get("led_bl_pin", 23))) BACKLIGHT_PIN = int(backlight_cfg.get("pin", _cfg.get("led_bl_pin", 23)))
BACKLIGHT_LEDS_PER_GAUGE = int(backlight_cfg.get("num_leds_per_gauge", 3)) BACKLIGHT_LEDS_PER_GAUGE = int(backlight_cfg.get("num_leds_per_gauge", 3))
STATUS_LEDS_PER_GAUGE = int(backlight_cfg.get("num_status_leds_per_gauge", 2))
device_cfg = _cfg.get("device", {}) device_cfg = _cfg.get("device", {})
DEVICE_NAME = device_cfg.get("name", _cfg.get("device_name", "Selsyn Multi")) DEVICE_NAME = device_cfg.get("name", _cfg.get("device_name", "Selsyn Multi"))
@@ -126,6 +127,8 @@ if "gauges" in _cfg:
"unit": g.get("unit", ""), "unit": g.get("unit", ""),
"red_pin": int(led_cfg.get("red_pin", 33)), "red_pin": int(led_cfg.get("red_pin", 33)),
"green_pin": int(led_cfg.get("green_pin", 32)), "green_pin": int(led_cfg.get("green_pin", 32)),
"ws2812_red": tuple(led_cfg.get("ws2812_red", [255, 0, 0])),
"ws2812_green": tuple(led_cfg.get("ws2812_green", [0, 255, 0])),
} }
) )
else: else:
@@ -142,6 +145,8 @@ else:
"unit": _cfg.get("gauge_unit", "W"), "unit": _cfg.get("gauge_unit", "W"),
"red_pin": int(_cfg.get("led_red_pin", 33)), "red_pin": int(_cfg.get("led_red_pin", 33)),
"green_pin": int(_cfg.get("led_green_pin", 32)), "green_pin": int(_cfg.get("led_green_pin", 32)),
"ws2812_red": tuple(_cfg.get("ws2812_red", [255, 0, 0])),
"ws2812_green": tuple(_cfg.get("ws2812_green", [0, 255, 0])),
} }
) )
BL_UNIT = _cfg.get("backlight_unit", "%") BL_UNIT = _cfg.get("backlight_unit", "%")
@@ -150,6 +155,7 @@ BL_UNIT = _cfg.get("backlight_unit", "%")
# Gauge initialization # Gauge initialization
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
info("Initialising gauge objects...")
gauge_objects = [] gauge_objects = []
for g in gauges: for g in gauges:
gauge_objects.append( gauge_objects.append(
@@ -164,6 +170,7 @@ for g in gauges:
info( info(
f"Gauge {g['id']}: {g['name']} pins={g['pins']} mode={g['mode']} range=[{g['min']}, {g['max']}]" f"Gauge {g['id']}: {g['name']} pins={g['pins']} mode={g['mode']} range=[{g['min']}, {g['max']}]"
) )
info("Gauge objects done")
gauge_targets = [g["min"] for g in gauges] # target value per gauge gauge_targets = [g["min"] for g in gauges] # target value per gauge
gauge_last_rezero = [utime.ticks_ms() for _ in gauges] gauge_last_rezero = [utime.ticks_ms() for _ in gauges]
@@ -189,17 +196,15 @@ def make_gauge_topics(prefix, gauge_id):
"led_red_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_red/config", "led_red_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_red/config",
"led_green_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_green/config", "led_green_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_green/config",
"led_bl_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_bl/config", "led_bl_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_bl/config",
"status_red": f"{prefix}/gauge{gauge_id}/status_led/red/set",
"status_green": f"{prefix}/gauge{gauge_id}/status_led/green/set",
"status_red_state": f"{prefix}/gauge{gauge_id}/status_led/red/state",
"status_green_state": f"{prefix}/gauge{gauge_id}/status_led/green/state",
"status_red_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_status_red/config",
"status_green_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_status_green/config",
} }
gauge_topics = [make_gauge_topics(MQTT_PREFIX, g["id"]) for g in gauges]
T_SET = f"{MQTT_PREFIX}/set"
T_STATE = f"{MQTT_PREFIX}/state"
T_STATUS = f"{MQTT_PREFIX}/status"
T_ZERO = f"{MQTT_PREFIX}/zero"
gauge_topics = [make_gauge_topics(MQTT_PREFIX, g["id"]) for g in gauges] gauge_topics = [make_gauge_topics(MQTT_PREFIX, g["id"]) for g in gauges]
T_SET = f"{MQTT_PREFIX}/set" T_SET = f"{MQTT_PREFIX}/set"
@@ -229,6 +234,12 @@ def make_gauge_topics(prefix, gauge_id):
"led_red_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_red/config", "led_red_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_red/config",
"led_green_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_green/config", "led_green_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_green/config",
"led_bl_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_bl/config", "led_bl_disc": f"homeassistant/light/{MQTT_CLIENT_ID}_g{gauge_id}_bl/config",
"status_red": f"{prefix}/gauge{gauge_id}/status_led/red/set",
"status_green": f"{prefix}/gauge{gauge_id}/status_led/green/set",
"status_red_state": f"{prefix}/gauge{gauge_id}/status_led/red/state",
"status_green_state": f"{prefix}/gauge{gauge_id}/status_led/green/state",
"status_red_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_status_red/config",
"status_green_disc": f"homeassistant/switch/{MQTT_CLIENT_ID}_g{gauge_id}_status_green/config",
} }
@@ -309,6 +320,7 @@ def check_wifi():
# LEDs (per gauge) # LEDs (per gauge)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
info("Initialising LEDs...")
num_gauges = len(gauges) num_gauges = len(gauges)
leds_red = [] leds_red = []
leds_green = [] leds_green = []
@@ -317,19 +329,25 @@ for g in gauges:
leds_red.append(Pin(g["red_pin"], Pin.OUT, value=0)) leds_red.append(Pin(g["red_pin"], Pin.OUT, value=0))
leds_green.append(Pin(g["green_pin"], Pin.OUT, value=0)) leds_green.append(Pin(g["green_pin"], Pin.OUT, value=0))
total_backlight_leds = num_gauges * BACKLIGHT_LEDS_PER_GAUGE total_backlight_leds = num_gauges * (BACKLIGHT_LEDS_PER_GAUGE + STATUS_LEDS_PER_GAUGE)
leds_bl = NeoPixel(Pin(BACKLIGHT_PIN), total_backlight_leds) info(f"Total backlight LEDs: {total_backlight_leds}")
leds_bl = (
_backlight_color = (0, 0, 0) NeoPixel(Pin(BACKLIGHT_PIN), total_backlight_leds)
_backlight_brightness = 100 if total_backlight_leds > 0
_backlight_on = False else None
_bl_dirty_since = None )
_BL_SAVE_DELAY_MS = 5000 info("LEDs done")
backlight_color = [(0, 0, 0) for _ in range(num_gauges)] backlight_color = [(0, 0, 0) for _ in range(num_gauges)]
backlight_brightness = [100 for _ in range(num_gauges)] backlight_brightness = [100 for _ in range(num_gauges)]
backlight_on = [False for _ in range(num_gauges)] backlight_on = [False for _ in range(num_gauges)]
status_led_red = [False for _ in range(num_gauges)]
status_led_green = [False for _ in range(num_gauges)]
_bl_dirty_since = None
_BL_SAVE_DELAY_MS = 5000
def _flush_backlight(client): def _flush_backlight(client):
for i in range(num_gauges): for i in range(num_gauges):
@@ -343,11 +361,7 @@ def _flush_backlight(client):
}, },
"brightness": int(backlight_brightness[i] * 2.55), "brightness": int(backlight_brightness[i] * 2.55),
} }
client.publish( client.publish(gt["led_bl_state"], ujson.dumps(payload), retain=True)
f"{gt['set'].replace('/set', '/backlight/state')}",
ujson.dumps(payload),
retain=True,
)
info( info(
f"Gauge {i} backlight: {payload['state']} {backlight_color[i]} @ {backlight_brightness[i]}%" f"Gauge {i} backlight: {payload['state']} {backlight_color[i]} @ {backlight_brightness[i]}%"
) )
@@ -379,9 +393,13 @@ def set_backlight_color(gauge_idx, r, g, b, brightness=None):
backlight_on[gauge_idx] = new_on backlight_on[gauge_idx] = new_on
scale = brightness / 100 scale = brightness / 100
base_idx = gauge_idx * BACKLIGHT_LEDS_PER_GAUGE leds_per_gauge = BACKLIGHT_LEDS_PER_GAUGE + STATUS_LEDS_PER_GAUGE
base_idx = gauge_idx * leds_per_gauge
for j in range(BACKLIGHT_LEDS_PER_GAUGE): for j in range(BACKLIGHT_LEDS_PER_GAUGE):
if leds_bl:
leds_bl[base_idx + j] = (int(g * scale), int(r * scale), int(b * scale)) leds_bl[base_idx + j] = (int(g * scale), int(r * scale), int(b * scale))
_update_status_leds(gauge_idx)
if leds_bl:
leds_bl.write() leds_bl.write()
_mark_bl_dirty() _mark_bl_dirty()
@@ -397,13 +415,49 @@ def set_backlight_brightness(gauge_idx, brightness):
backlight_on[gauge_idx] = new_on backlight_on[gauge_idx] = new_on
r, g, b = backlight_color[gauge_idx] r, g, b = backlight_color[gauge_idx]
scale = clamped / 100 scale = clamped / 100
base_idx = gauge_idx * BACKLIGHT_LEDS_PER_GAUGE leds_per_gauge = BACKLIGHT_LEDS_PER_GAUGE + STATUS_LEDS_PER_GAUGE
base_idx = gauge_idx * leds_per_gauge
for j in range(BACKLIGHT_LEDS_PER_GAUGE): for j in range(BACKLIGHT_LEDS_PER_GAUGE):
if leds_bl:
leds_bl[base_idx + j] = (int(g * scale), int(r * scale), int(b * scale)) leds_bl[base_idx + j] = (int(g * scale), int(r * scale), int(b * scale))
_update_status_leds(gauge_idx)
if leds_bl:
leds_bl.write() leds_bl.write()
_mark_bl_dirty() _mark_bl_dirty()
def _update_status_leds(gauge_idx):
if not leds_bl:
return
leds_per_gauge = BACKLIGHT_LEDS_PER_GAUGE + STATUS_LEDS_PER_GAUGE
base_idx = gauge_idx * leds_per_gauge + BACKLIGHT_LEDS_PER_GAUGE
g_cfg = gauges[gauge_idx]
red_color = g_cfg["ws2812_red"]
green_color = g_cfg["ws2812_green"]
if status_led_red[gauge_idx]:
leds_bl[base_idx] = (red_color[1], red_color[0], red_color[2])
else:
leds_bl[base_idx] = (0, 0, 0)
if status_led_green[gauge_idx]:
leds_bl[base_idx + 1] = (green_color[1], green_color[0], green_color[2])
else:
leds_bl[base_idx + 1] = (0, 0, 0)
def set_status_led(gauge_idx, led_type, state):
global status_led_red, status_led_green
if led_type == "red":
status_led_red[gauge_idx] = state
elif led_type == "green":
status_led_green[gauge_idx] = state
_update_status_leds(gauge_idx)
if leds_bl:
leds_bl.write()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# State # State
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -513,6 +567,20 @@ def on_message(topic, payload):
info(f"Gauge {i} backlight → #{r:02x}{g:02x}{b:02x} @ {brightness}%") info(f"Gauge {i} backlight → #{r:02x}{g:02x}{b:02x} @ {brightness}%")
return return
if topic == gt["status_red"]:
state = payload.upper() == "ON"
set_status_led(i, "red", state)
_publish(gt["status_red_state"], "ON" if state else "OFF", retain=True)
info(f"Gauge {i} status red → {'ON' if state else 'OFF'}")
return
if topic == gt["status_green"]:
state = payload.upper() == "ON"
set_status_led(i, "green", state)
_publish(gt["status_green_state"], "ON" if state else "OFF", retain=True)
info(f"Gauge {i} status green → {'ON' if state else 'OFF'}")
return
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# MQTT connect + discovery # MQTT connect + discovery
@@ -530,7 +598,7 @@ def connect_mqtt():
password=MQTT_PASSWORD, password=MQTT_PASSWORD,
keepalive=60, keepalive=60,
) )
client.set_last_will(T_STATUS, b"offline", retain=True, qos=0) client.set_last_will(gauge_topics[0]["status"], b"offline", retain=True, qos=0)
client.set_callback(on_message) client.set_callback(on_message)
client.connect() client.connect()
client_ref = client client_ref = client
@@ -542,6 +610,8 @@ def connect_mqtt():
client.subscribe(gt["led_red"]) client.subscribe(gt["led_red"])
client.subscribe(gt["led_green"]) client.subscribe(gt["led_green"])
client.subscribe(gt["led_bl"]) client.subscribe(gt["led_bl"])
client.subscribe(gt["status_red"])
client.subscribe(gt["status_green"])
_mqtt_connected = True _mqtt_connected = True
info(f"MQTT connected client_id={MQTT_CLIENT_ID}") info(f"MQTT connected client_id={MQTT_CLIENT_ID}")
return client return client
@@ -581,7 +651,9 @@ def check_mqtt():
password=MQTT_PASSWORD, password=MQTT_PASSWORD,
keepalive=60, keepalive=60,
) )
client_ref.set_last_will(T_STATUS, b"offline", retain=True, qos=0) client_ref.set_last_will(
gauge_topics[0]["status"], b"offline", retain=True, qos=0
)
client_ref.set_callback(on_message) client_ref.set_callback(on_message)
client_ref.connect() client_ref.connect()
client_ref.subscribe(T_SET) client_ref.subscribe(T_SET)
@@ -592,6 +664,8 @@ def check_mqtt():
client_ref.subscribe(gt["led_red"]) client_ref.subscribe(gt["led_red"])
client_ref.subscribe(gt["led_green"]) client_ref.subscribe(gt["led_green"])
client_ref.subscribe(gt["led_bl"]) client_ref.subscribe(gt["led_bl"])
client_ref.subscribe(gt["status_red"])
client_ref.subscribe(gt["status_green"])
_mqtt_connected = True _mqtt_connected = True
info("MQTT reconnected!") info("MQTT reconnected!")
publish_discovery(client_ref) publish_discovery(client_ref)
@@ -696,6 +770,44 @@ def publish_discovery(client):
) )
info(f"Discovery: gauge {i} backlight") info(f"Discovery: gauge {i} backlight")
client.publish(
gt["status_red_disc"],
ujson.dumps(
{
"name": f"{g['name']} Status Red",
"uniq_id": f"{MQTT_CLIENT_ID}_g{i}_status_red",
"cmd_t": gt["status_red"],
"stat_t": gt["status_red_state"],
"pl_on": "ON",
"pl_off": "OFF",
"icon": "mdi:led-on",
"dev": _dev_ref,
"ret": True,
}
),
retain=True,
)
info(f"Discovery: gauge {i} status red")
client.publish(
gt["status_green_disc"],
ujson.dumps(
{
"name": f"{g['name']} Status Green",
"uniq_id": f"{MQTT_CLIENT_ID}_g{i}_status_green",
"cmd_t": gt["status_green"],
"stat_t": gt["status_green_state"],
"pl_on": "ON",
"pl_off": "OFF",
"icon": "mdi:led-on",
"dev": _dev_ref,
"ret": True,
}
),
retain=True,
)
info(f"Discovery: gauge {i} status green")
def publish_state(client): def publish_state(client):
for i, g in enumerate(gauge_objects): for i, g in enumerate(gauge_objects):
@@ -712,21 +824,29 @@ def publish_state(client):
def main(): def main():
utime.sleep_ms(0)
info("=" * 48) info("=" * 48)
info("Gauge MQTT controller starting") info("Gauge MQTT controller starting")
info("=" * 48) info("=" * 48)
info("Connecting WiFi...")
connect_wifi(WIFI_SSID, WIFI_PASSWORD) connect_wifi(WIFI_SSID, WIFI_PASSWORD)
info("WiFi done")
info("Zeroing gauges on startup ...") info("Zeroing gauges on startup ...")
for i, g in enumerate(gauge_objects): for i, g in enumerate(gauge_objects):
g.zero() g.zero()
info(f"Zeroed gauge {i}") info(f"Zeroed gauge {i}")
info("Zero complete") info("Zero done")
info("Connecting MQTT...")
connect_mqtt() connect_mqtt()
info("MQTT done")
info("Publishing discovery...")
publish_discovery(client_ref) publish_discovery(client_ref)
publish_state(client_ref) publish_state(client_ref)
info("Discovery done")
info("Entering main loop") info("Entering main loop")
info("-" * 48) info("-" * 48)
@@ -743,6 +863,7 @@ def main():
last_heartbeat = utime.ticks_ms() last_heartbeat = utime.ticks_ms()
while True: while True:
utime.sleep_ms(0)
try: try:
check_wifi() check_wifi()

74
ota.py
View File

@@ -82,31 +82,55 @@ OTA_MANIFEST = "ota_manifest.txt"
# Logging # Logging
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _ts(): def _ts():
ms = utime.ticks_ms() ms = utime.ticks_ms()
return f"{(ms//3600000)%24:02d}:{(ms//60000)%60:02d}:{(ms//1000)%60:02d}.{ms%1000:03d}" return f"{(ms // 3600000) % 24:02d}:{(ms // 60000) % 60:02d}:{(ms // 1000) % 60:02d}.{ms % 1000:03d}"
def _log(level, msg):
print(f"[{_ts()}] {level:5s} [OTA] {msg}")
def info(msg):
_log("INFO", msg)
def warn(msg):
_log("WARN", msg)
def log_err(msg):
_log("ERROR", msg)
def _log(level, msg): print(f"[{_ts()}] {level:5s} [OTA] {msg}")
def info(msg): _log("INFO", msg)
def warn(msg): _log("WARN", msg)
def log_err(msg): _log("ERROR", msg)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# HTTP helpers # HTTP helpers
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _headers(): def _headers():
h = {"Accept": "application/json"} h = {"Accept": "application/json"}
if API_TOKEN: if API_TOKEN:
h["Authorization"] = f"token {API_TOKEN}" h["Authorization"] = f"token {API_TOKEN}"
return h return h
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Config loader # Config loader
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def load_config(): def load_config():
global GITEA_BASE, REPO_OWNER, REPO_NAME, REPO_FOLDER, REPO_BRANCH, API_TOKEN, WIFI_SSID, WIFI_PASSWORD global \
GITEA_BASE, \
REPO_OWNER, \
REPO_NAME, \
REPO_FOLDER, \
REPO_BRANCH, \
API_TOKEN, \
WIFI_SSID, \
WIFI_PASSWORD
try: try:
with open(SETTINGS_FILE) as f: with open(SETTINGS_FILE) as f:
cfg = ujson.load(f) cfg = ujson.load(f)
@@ -124,10 +148,12 @@ def load_config():
except Exception as e: except Exception as e:
warn(f"Config parse error: {e} — using defaults") warn(f"Config parse error: {e} — using defaults")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Helpers # Helpers
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _match_pattern(name, pattern): def _match_pattern(name, pattern):
if "*" not in pattern: if "*" not in pattern:
return name == pattern return name == pattern
@@ -150,11 +176,9 @@ def _match_pattern(name, pattern):
i += 1 i += 1
return i == n and j == m return i == n and j == m
def _fetch_commit_sha(): def _fetch_commit_sha():
url = ( url = f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}/branches/{REPO_BRANCH}"
f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}"
f"/branches/{REPO_BRANCH}"
)
try: try:
r = urequests.get(url, headers=_headers()) r = urequests.get(url, headers=_headers())
if r.status_code == 200: if r.status_code == 200:
@@ -166,6 +190,7 @@ def _fetch_commit_sha():
log_err(f"Failed to fetch commit: {e}") log_err(f"Failed to fetch commit: {e}")
return None return None
def _fetch_manifest(): def _fetch_manifest():
url = ( url = (
f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}" f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}"
@@ -178,6 +203,7 @@ def _fetch_manifest():
r.close() r.close()
if data.get("content"): if data.get("content"):
import ubinascii import ubinascii
content = ubinascii.a2b_base64(data["content"]).decode() content = ubinascii.a2b_base64(data["content"]).decode()
patterns = [line.strip() for line in content.splitlines()] patterns = [line.strip() for line in content.splitlines()]
return [p for p in patterns if p and not p.startswith("#")] return [p for p in patterns if p and not p.startswith("#")]
@@ -188,6 +214,7 @@ def _fetch_manifest():
log_err(f"Failed to fetch manifest: {e}") log_err(f"Failed to fetch manifest: {e}")
return None return None
def _fetch_dir(path): def _fetch_dir(path):
url = ( url = (
f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}" f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}"
@@ -195,6 +222,7 @@ def _fetch_dir(path):
) )
return _api_get(url) return _api_get(url)
def _api_get(url): def _api_get(url):
"""GET a URL and return parsed JSON, or None on failure.""" """GET a URL and return parsed JSON, or None on failure."""
try: try:
@@ -209,6 +237,7 @@ def _api_get(url):
log_err(f"GET {url} failed: {e}") log_err(f"GET {url} failed: {e}")
return None return None
def _download(url, dest_path): def _download(url, dest_path):
"""Download url to dest_path. Returns True on success.""" """Download url to dest_path. Returns True on success."""
tmp = dest_path + ".tmp" tmp = dest_path + ".tmp"
@@ -236,6 +265,7 @@ def _download(url, dest_path):
pass pass
return False return False
def _load_manifest(): def _load_manifest():
try: try:
with open(MANIFEST_FILE) as f: with open(MANIFEST_FILE) as f:
@@ -243,6 +273,7 @@ def _load_manifest():
except Exception: except Exception:
return {} return {}
def _save_manifest(manifest, commit_sha=None): def _save_manifest(manifest, commit_sha=None):
try: try:
with open(MANIFEST_FILE, "w") as f: with open(MANIFEST_FILE, "w") as f:
@@ -252,6 +283,7 @@ def _save_manifest(manifest, commit_sha=None):
except Exception as e: except Exception as e:
warn(f"Could not save manifest: {e}") warn(f"Could not save manifest: {e}")
def _wipe_manifest(): def _wipe_manifest():
try: try:
os.remove(MANIFEST_FILE) os.remove(MANIFEST_FILE)
@@ -259,6 +291,7 @@ def _wipe_manifest():
except OSError: except OSError:
pass pass
def _ok_flag_exists(): def _ok_flag_exists():
try: try:
os.stat(OK_FLAG_FILE) os.stat(OK_FLAG_FILE)
@@ -266,12 +299,14 @@ def _ok_flag_exists():
except OSError: except OSError:
return False return False
def _clear_ok_flag(): def _clear_ok_flag():
try: try:
os.remove(OK_FLAG_FILE) os.remove(OK_FLAG_FILE)
except OSError: except OSError:
pass pass
def mark_ok(): def mark_ok():
""" """
Call this from main.py after successful startup. Call this from main.py after successful startup.
@@ -283,10 +318,12 @@ def mark_ok():
except Exception as e: except Exception as e:
warn(f"Could not write OK flag: {e}") warn(f"Could not write OK flag: {e}")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Core update logic # Core update logic
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _fetch_file_list(): def _fetch_file_list():
""" """
Returns list of {name, sha, download_url} dicts based on the Returns list of {name, sha, download_url} dicts based on the
@@ -319,11 +356,13 @@ def _fetch_file_list():
if _match_pattern(name, p) or _match_pattern(entry["path"], p): if _match_pattern(name, p) or _match_pattern(entry["path"], p):
if entry["path"] not in visited: if entry["path"] not in visited:
visited.add(entry["path"]) visited.add(entry["path"])
files.append({ files.append(
{
"name": entry["path"], "name": entry["path"],
"sha": entry["sha"], "sha": entry["sha"],
"download_url": entry["download_url"], "download_url": entry["download_url"],
}) }
)
break break
root = _fetch_dir(REPO_FOLDER) root = _fetch_dir(REPO_FOLDER)
@@ -333,12 +372,15 @@ def _fetch_file_list():
fetch_matching(root, manifest_patterns) fetch_matching(root, manifest_patterns)
return files return files
def _do_update(commit_sha=None): def _do_update(commit_sha=None):
""" """
Fetch file list, download changed files, update manifest. Fetch file list, download changed files, update manifest.
Returns True if all succeeded (or nothing needed updating). Returns True if all succeeded (or nothing needed updating).
""" """
info(f"Checking {GITEA_BASE}/{REPO_OWNER}/{REPO_NAME}/{REPO_FOLDER} @ {REPO_BRANCH}") info(
f"Checking {GITEA_BASE}/{REPO_OWNER}/{REPO_NAME}/{REPO_FOLDER} @ {REPO_BRANCH}"
)
file_list = _fetch_file_list() file_list = _fetch_file_list()
if file_list is None: if file_list is None:
log_err("Could not fetch file list — skipping update") log_err("Could not fetch file list — skipping update")
@@ -382,10 +424,12 @@ def _do_update(commit_sha=None):
return True return True
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Public entry point # Public entry point
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def update(): def update():
""" """
Main entry point. Call from boot.py before importing application code. Main entry point. Call from boot.py before importing application code.
@@ -408,9 +452,7 @@ def update():
if not ok_flag: if not ok_flag:
warn("OK flag missing — last boot may have failed") warn("OK flag missing — last boot may have failed")
warn("Wiping manifest to force full re-fetch") warn("Re-checking all files, will only download changed ones")
_wipe_manifest()
manifest = {}
else: else:
info("OK flag present — last boot was good") info("OK flag present — last boot was good")