""" ota.py — Gitea OTA updater for ESP32 / MicroPython Call ota.update() from boot.py before importing anything else. If the update or the subsequent boot fails, the updater retries on the next boot rather than bricking the device. Strategy -------- 1. Check if last boot was good (OK flag exists). 2. If good, fetch remote commit SHA and compare with local — if unchanged, skip file check entirely. 3. If new commit or failed boot, fetch ota_manifest.txt from the repo to determine which files to sync. 4. Compare SHA1 hashes with a local manifest (.ota_manifest.json). 5. Download only changed or missing files, writing to .tmp first. 6. On success, rename .tmp files into place and update the manifest. 7. If anything fails mid-update, the manifest is not updated, so the next boot will retry. Partially written .tmp files are cleaned up. 8. A "safety" flag file (.ota_ok) is written by main.py on successful startup. If it is absent on boot, the previous update is suspected bad — the manifest is wiped so all files are re-fetched cleanly. Manifest format (ota_manifest.txt) --------------------------------- Each line specifies a file or directory to include: boot.py # specific file ota.py # another file selsyn/ # entire directory (trailing slash) lib/ # another directory *.py # wildcard (matches anywhere) selsyn/*.py # wildcard in subdirectory Usage in boot.py ---------------- import ota ota.update() # imports of main etc. go here Configuration ------------- Edit the block below, or override from a local config file (see SETTINGS_FILE). All settings can be left as module-level constants or placed in /ota_config.json: { "gitea_base": "http://git.baumann.gr", "repo_owner": "adebaumann", "repo_name": "HomeControlPanel", "repo_folder": "firmware", "repo_branch": "main", "api_token": "nicetry-nothere" } """ import os import gc import sys import ujson import urequests import utime # --------------------------------------------------------------------------- # Default configuration — override via /ota_config.json # --------------------------------------------------------------------------- GITEA_BASE = "http://git.baumann.gr" # no trailing slash REPO_OWNER = "adrian" REPO_NAME = "esp32-gauge" REPO_FOLDER = "firmware" # folder inside repo to sync REPO_BRANCH = "main" API_TOKEN = None # set to string for private repos WIFI_SSID = None WIFI_PASSWORD = None SETTINGS_FILE = "/ota_config.json" MANIFEST_FILE = "/.ota_manifest.json" OK_FLAG_FILE = "/.ota_ok" OTA_MANIFEST = "ota_manifest.txt" # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- def _ts(): ms = utime.ticks_ms() return f"{(ms//3600000)%24:02d}:{(ms//60000)%60:02d}:{(ms//1000)%60:02d}.{ms%1000:03d}" def _log(level, msg): print(f"[{_ts()}] {level:5s} [OTA] {msg}") def info(msg): _log("INFO", msg) def warn(msg): _log("WARN", msg) def log_err(msg): _log("ERROR", msg) # --------------------------------------------------------------------------- # HTTP helpers # --------------------------------------------------------------------------- def _headers(): h = {"Accept": "application/json"} if API_TOKEN: h["Authorization"] = f"token {API_TOKEN}" return h # --------------------------------------------------------------------------- # Config loader # --------------------------------------------------------------------------- def load_config(): global GITEA_BASE, REPO_OWNER, REPO_NAME, REPO_FOLDER, REPO_BRANCH, API_TOKEN, WIFI_SSID, WIFI_PASSWORD try: with open(SETTINGS_FILE) as f: cfg = ujson.load(f) GITEA_BASE = cfg.get("gitea_base", GITEA_BASE) REPO_OWNER = cfg.get("repo_owner", REPO_OWNER) REPO_NAME = cfg.get("repo_name", REPO_NAME) REPO_FOLDER = cfg.get("repo_folder", REPO_FOLDER) REPO_BRANCH = cfg.get("repo_branch", REPO_BRANCH) API_TOKEN = cfg.get("api_token", API_TOKEN) WIFI_SSID = cfg.get("wifi_ssid", WIFI_SSID) WIFI_PASSWORD = cfg.get("wifi_password", WIFI_PASSWORD) info(f"Config loaded from {SETTINGS_FILE}") except OSError: info(f"No {SETTINGS_FILE} found — using defaults") except Exception as e: warn(f"Config parse error: {e} — using defaults") # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _match_pattern(name, pattern): if "*" not in pattern: return name == pattern i, n = 0, len(pattern) j, m = 0, len(name) star = -1 while i < n and j < m: if pattern[i] == "*": star = i i += 1 elif pattern[i] == name[j]: i += 1 j += 1 elif star >= 0: i = star + 1 j += 1 else: return False while i < n and pattern[i] == "*": i += 1 return i == n and j == m def _fetch_commit_sha(): url = ( f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}" f"/branches/{REPO_BRANCH}" ) info(f"Fetching commit from {url}") try: r = urequests.get(url, headers=_headers()) info(f"Response status: {r.status_code}") if r.status_code == 200: data = r.json() r.close() sha = data.get("commit", {}).get("sha") info(f"Got commit: {sha}") return sha r.close() except Exception as e: log_err(f"Failed to fetch commit: {e}") return None def _fetch_manifest(): url = ( f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}" f"/contents/{OTA_MANIFEST}?ref={REPO_BRANCH}" ) try: r = urequests.get(url, headers=_headers()) if r.status_code == 200: data = r.json() r.close() if data.get("content"): import ubinascii content = ubinascii.a2b_base64(data["content"]).decode() patterns = [line.strip() for line in content.splitlines()] return [p for p in patterns if p and not p.startswith("#")] else: warn(f"Manifest not found at {OTA_MANIFEST}") r.close() except Exception as e: log_err(f"Failed to fetch manifest: {e}") return None def _fetch_dir(path): url = ( f"{GITEA_BASE}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}" f"/contents/{path}?ref={REPO_BRANCH}" ) return _api_get(url) def _api_get(url): """GET a URL and return parsed JSON, or None on failure.""" try: r = urequests.get(url, headers=_headers()) if r.status_code == 200: data = r.json() r.close() return data warn(f"HTTP {r.status_code} for {url}") r.close() except Exception as e: log_err(f"GET {url} failed: {e}") return None def _download(url, dest_path): """Download url to dest_path. Returns True on success.""" tmp = dest_path + ".tmp" try: r = urequests.get(url, headers=_headers()) if r.status_code != 200: warn(f"Download failed HTTP {r.status_code}: {url}") r.close() return False with open(tmp, "wb") as f: f.write(r.content) r.close() # Rename into place try: os.remove(dest_path) except OSError: pass os.rename(tmp, dest_path) return True except Exception as e: log_err(f"Download error {url}: {e}") try: os.remove(tmp) except OSError: pass return False def _load_manifest(): try: with open(MANIFEST_FILE) as f: return ujson.load(f) except Exception: return {} def _save_manifest(manifest, commit_sha=None): try: with open(MANIFEST_FILE, "w") as f: if commit_sha: manifest["_commit"] = commit_sha ujson.dump(manifest, f) except Exception as e: warn(f"Could not save manifest: {e}") def _wipe_manifest(): try: os.remove(MANIFEST_FILE) info("Manifest wiped — full re-fetch on next update") except OSError: pass def _ok_flag_exists(): try: os.stat(OK_FLAG_FILE) return True except OSError: return False def _clear_ok_flag(): try: os.remove(OK_FLAG_FILE) except OSError: pass def mark_ok(): """ Call this from main.py after successful startup. Signals to the OTA updater that the last update was good. """ try: with open(OK_FLAG_FILE, "w") as f: f.write("ok") except Exception as e: warn(f"Could not write OK flag: {e}") # --------------------------------------------------------------------------- # Core update logic # --------------------------------------------------------------------------- def _fetch_file_list(): """ Returns list of {name, sha, download_url} dicts based on the ota_manifest.txt patterns in the repo folder, or None on failure. """ manifest_patterns = _fetch_manifest() if manifest_patterns is None: log_err("No manifest — cannot determine what to fetch") return None info(f"Manifest patterns: {manifest_patterns}") files = [] visited = set() def fetch_matching(entries, patterns): for entry in entries: if entry.get("type") == "dir": for p in patterns: if p.endswith("/") and entry["name"].startswith(p.rstrip("/")): sub = _fetch_dir(entry["path"]) if sub: fetch_matching(sub, patterns) break else: name = entry["name"] if not name.endswith(".py"): continue for p in patterns: p = p.rstrip("/") if _match_pattern(name, p) or _match_pattern(entry["path"], p): if entry["path"] not in visited: visited.add(entry["path"]) files.append({ "name": entry["path"], "sha": entry["sha"], "download_url": entry["download_url"], }) break root = _fetch_dir(REPO_FOLDER) if root is None: return None fetch_matching(root, manifest_patterns) return files def _do_update(commit_sha=None): """ Fetch file list, download changed files, update manifest. Returns True if all succeeded (or nothing needed updating). """ info(f"Checking {GITEA_BASE}/{REPO_OWNER}/{REPO_NAME}/{REPO_FOLDER} @ {REPO_BRANCH}") file_list = _fetch_file_list() if file_list is None: log_err("Could not fetch file list — skipping update") return False info(f"Found {len(file_list)} file(s) to sync") manifest = _load_manifest() updated = [] failed = [] for entry in file_list: name = entry["name"] sha = entry["sha"] if manifest.get(name) == sha: info(f" {name} up to date") continue info(f" {name} updating (sha={sha[:8]}...)") gc.collect() ok = _download(entry["download_url"], f"/{name}") if ok: manifest[name] = sha updated.append(name) info(f" {name} OK") else: failed.append(name) log_err(f" {name} FAILED") if failed: log_err(f"Update incomplete — {len(failed)} file(s) failed: {failed}") _save_manifest(manifest, commit_sha) return False _save_manifest(manifest, commit_sha) if updated: info(f"Update complete — {len(updated)} file(s) updated: {updated}") else: info("All files up to date — nothing to do") return True # --------------------------------------------------------------------------- # Public entry point # --------------------------------------------------------------------------- def update(): """ Main entry point. Call from boot.py before importing application code. - If the OK flag is missing, the previous boot is assumed to have failed — wipes the manifest so everything is re-fetched cleanly. - If the commit hash hasn't changed and last boot was good, skip file comparison entirely. - Runs the update. - Clears the OK flag so main.py must re-assert it on successful start. """ info("=" * 40) info("OTA updater starting") info("=" * 40) load_config() ok_flag = _ok_flag_exists() manifest = _load_manifest() if not ok_flag: warn("OK flag missing — last boot may have failed") warn("Wiping manifest to force full re-fetch") _wipe_manifest() manifest = {} else: info("OK flag present — last boot was good") commit_sha = _fetch_commit_sha() if ok_flag and commit_sha and manifest.get("_commit") == commit_sha: info(f"Commit unchanged ({commit_sha[:8]}) — skipping file check") info("-" * 40) return if commit_sha: info(f"Remote commit: {commit_sha[:8]}") else: warn("Could not fetch remote commit — proceeding with file check") # Clear the flag now; main.py must call ota.mark_ok() to re-set it _clear_ok_flag() success = _do_update(commit_sha) if success: info("OTA check complete — booting application") else: warn("OTA check had errors — booting with current files") info("-" * 40) gc.collect()