From db3798dc07e6145640745d83b286890af99fe6cf Mon Sep 17 00:00:00 2001 From: "Adrian A. Baumann" Date: Sat, 11 Apr 2026 23:32:49 +0200 Subject: [PATCH] OTA now checks files by default instead of overwriting --- ota.py | 118 ++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 80 insertions(+), 38 deletions(-) diff --git a/ota.py b/ota.py index 46d47b4..d88f7ae 100644 --- a/ota.py +++ b/ota.py @@ -63,60 +63,84 @@ import utime # Default configuration — override via /ota_config.json # --------------------------------------------------------------------------- -GITEA_BASE = "http://git.baumann.gr" # no trailing slash -REPO_OWNER = "adrian" -REPO_NAME = "esp32-gauge" -REPO_FOLDER = "firmware" # folder inside repo to sync -REPO_BRANCH = "main" -API_TOKEN = None # set to string for private repos +GITEA_BASE = "http://git.baumann.gr" # no trailing slash +REPO_OWNER = "adrian" +REPO_NAME = "esp32-gauge" +REPO_FOLDER = "firmware" # folder inside repo to sync +REPO_BRANCH = "main" +API_TOKEN = None # set to string for private repos -WIFI_SSID = None +WIFI_SSID = None WIFI_PASSWORD = None SETTINGS_FILE = "/ota_config.json" MANIFEST_FILE = "/.ota_manifest.json" -OK_FLAG_FILE = "/.ota_ok" -OTA_MANIFEST = "ota_manifest.txt" +OK_FLAG_FILE = "/.ota_ok" +OTA_MANIFEST = "ota_manifest.txt" # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- + def _ts(): ms = utime.ticks_ms() - return f"{(ms//3600000)%24:02d}:{(ms//60000)%60:02d}:{(ms//1000)%60:02d}.{ms%1000:03d}" + return f"{(ms // 3600000) % 24:02d}:{(ms // 60000) % 60:02d}:{(ms // 1000) % 60:02d}.{ms % 1000:03d}" + + +def _log(level, msg): + print(f"[{_ts()}] {level:5s} [OTA] {msg}") + + +def info(msg): + _log("INFO", msg) + + +def warn(msg): + _log("WARN", msg) + + +def log_err(msg): + _log("ERROR", msg) -def _log(level, msg): print(f"[{_ts()}] {level:5s} [OTA] {msg}") -def info(msg): _log("INFO", msg) -def warn(msg): _log("WARN", msg) -def log_err(msg): _log("ERROR", msg) # --------------------------------------------------------------------------- # HTTP helpers # --------------------------------------------------------------------------- + def _headers(): h = {"Accept": "application/json"} if API_TOKEN: h["Authorization"] = f"token {API_TOKEN}" return h + # --------------------------------------------------------------------------- # Config loader # --------------------------------------------------------------------------- + def load_config(): - global GITEA_BASE, REPO_OWNER, REPO_NAME, REPO_FOLDER, REPO_BRANCH, API_TOKEN, WIFI_SSID, WIFI_PASSWORD + global \ + GITEA_BASE, \ + REPO_OWNER, \ + REPO_NAME, \ + REPO_FOLDER, \ + REPO_BRANCH, \ + API_TOKEN, \ + WIFI_SSID, \ + WIFI_PASSWORD try: with open(SETTINGS_FILE) as f: cfg = ujson.load(f) - GITEA_BASE = cfg.get("gitea_base", GITEA_BASE) - REPO_OWNER = cfg.get("repo_owner", REPO_OWNER) - REPO_NAME = cfg.get("repo_name", REPO_NAME) - REPO_FOLDER = cfg.get("repo_folder", REPO_FOLDER) - REPO_BRANCH = cfg.get("repo_branch", REPO_BRANCH) - API_TOKEN = cfg.get("api_token", API_TOKEN) - WIFI_SSID = cfg.get("wifi_ssid", WIFI_SSID) + GITEA_BASE = cfg.get("gitea_base", GITEA_BASE) + REPO_OWNER = cfg.get("repo_owner", REPO_OWNER) + REPO_NAME = cfg.get("repo_name", REPO_NAME) + REPO_FOLDER = cfg.get("repo_folder", REPO_FOLDER) + REPO_BRANCH = cfg.get("repo_branch", REPO_BRANCH) + API_TOKEN = cfg.get("api_token", API_TOKEN) + WIFI_SSID = cfg.get("wifi_ssid", WIFI_SSID) WIFI_PASSWORD = cfg.get("wifi_password", WIFI_PASSWORD) info(f"Config loaded from {SETTINGS_FILE}") except OSError: @@ -124,10 +148,12 @@ def load_config(): except Exception as e: warn(f"Config parse error: {e} — using defaults") + # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- + def _match_pattern(name, pattern): if "*" not in pattern: return name == pattern @@ -150,11 +176,9 @@ def _match_pattern(name, pattern): i += 1 return i == n and j == m + def _fetch_commit_sha(): - url = ( - f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}" - f"/branches/{REPO_BRANCH}" - ) + url = f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}/branches/{REPO_BRANCH}" try: r = urequests.get(url, headers=_headers()) if r.status_code == 200: @@ -166,6 +190,7 @@ def _fetch_commit_sha(): log_err(f"Failed to fetch commit: {e}") return None + def _fetch_manifest(): url = ( f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}" @@ -178,6 +203,7 @@ def _fetch_manifest(): r.close() if data.get("content"): import ubinascii + content = ubinascii.a2b_base64(data["content"]).decode() patterns = [line.strip() for line in content.splitlines()] return [p for p in patterns if p and not p.startswith("#")] @@ -188,6 +214,7 @@ def _fetch_manifest(): log_err(f"Failed to fetch manifest: {e}") return None + def _fetch_dir(path): url = ( f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}" @@ -195,6 +222,7 @@ def _fetch_dir(path): ) return _api_get(url) + def _api_get(url): """GET a URL and return parsed JSON, or None on failure.""" try: @@ -209,6 +237,7 @@ def _api_get(url): log_err(f"GET {url} failed: {e}") return None + def _download(url, dest_path): """Download url to dest_path. Returns True on success.""" tmp = dest_path + ".tmp" @@ -236,6 +265,7 @@ def _download(url, dest_path): pass return False + def _load_manifest(): try: with open(MANIFEST_FILE) as f: @@ -243,6 +273,7 @@ def _load_manifest(): except Exception: return {} + def _save_manifest(manifest, commit_sha=None): try: with open(MANIFEST_FILE, "w") as f: @@ -252,6 +283,7 @@ def _save_manifest(manifest, commit_sha=None): except Exception as e: warn(f"Could not save manifest: {e}") + def _wipe_manifest(): try: os.remove(MANIFEST_FILE) @@ -259,6 +291,7 @@ def _wipe_manifest(): except OSError: pass + def _ok_flag_exists(): try: os.stat(OK_FLAG_FILE) @@ -266,12 +299,14 @@ def _ok_flag_exists(): except OSError: return False + def _clear_ok_flag(): try: os.remove(OK_FLAG_FILE) except OSError: pass + def mark_ok(): """ Call this from main.py after successful startup. @@ -283,10 +318,12 @@ def mark_ok(): except Exception as e: warn(f"Could not write OK flag: {e}") + # --------------------------------------------------------------------------- # Core update logic # --------------------------------------------------------------------------- + def _fetch_file_list(): """ Returns list of {name, sha, download_url} dicts based on the @@ -319,11 +356,13 @@ def _fetch_file_list(): if _match_pattern(name, p) or _match_pattern(entry["path"], p): if entry["path"] not in visited: visited.add(entry["path"]) - files.append({ - "name": entry["path"], - "sha": entry["sha"], - "download_url": entry["download_url"], - }) + files.append( + { + "name": entry["path"], + "sha": entry["sha"], + "download_url": entry["download_url"], + } + ) break root = _fetch_dir(REPO_FOLDER) @@ -333,12 +372,15 @@ def _fetch_file_list(): fetch_matching(root, manifest_patterns) return files + def _do_update(commit_sha=None): """ Fetch file list, download changed files, update manifest. Returns True if all succeeded (or nothing needed updating). """ - info(f"Checking {GITEA_BASE}/{REPO_OWNER}/{REPO_NAME}/{REPO_FOLDER} @ {REPO_BRANCH}") + info( + f"Checking {GITEA_BASE}/{REPO_OWNER}/{REPO_NAME}/{REPO_FOLDER} @ {REPO_BRANCH}" + ) file_list = _fetch_file_list() if file_list is None: log_err("Could not fetch file list — skipping update") @@ -346,12 +388,12 @@ def _do_update(commit_sha=None): info(f"Found {len(file_list)} file(s) to sync") manifest = _load_manifest() - updated = [] - failed = [] + updated = [] + failed = [] for entry in file_list: name = entry["name"] - sha = entry["sha"] + sha = entry["sha"] if manifest.get(name) == sha: info(f" {name} up to date") @@ -382,10 +424,12 @@ def _do_update(commit_sha=None): return True + # --------------------------------------------------------------------------- # Public entry point # --------------------------------------------------------------------------- + def update(): """ Main entry point. Call from boot.py before importing application code. @@ -408,9 +452,7 @@ def update(): if not ok_flag: warn("OK flag missing — last boot may have failed") - warn("Wiping manifest to force full re-fetch") - _wipe_manifest() - manifest = {} + warn("Re-checking all files, will only download changed ones") else: info("OK flag present — last boot was good")