6 Commits

13 changed files with 67 additions and 68 deletions

BIN
3d_model/20-31.stl Normal file

Binary file not shown.

BIN
3d_model/20-35.stl Normal file

Binary file not shown.

BIN
3d_model/22-33.stl Normal file

Binary file not shown.

BIN
3d_model/24-31.stl Normal file

Binary file not shown.

BIN
3d_model/24-35.stl Normal file

Binary file not shown.

BIN
3d_model/Box for five.stl Normal file

Binary file not shown.

BIN
3d_model/Pointer.stl Normal file

Binary file not shown.

BIN
3d_model/VFD-Holder.stl Normal file

Binary file not shown.

View File

@@ -28,5 +28,5 @@
"red_led_entity_name": "Selsyn 1 Red LED", "red_led_entity_name": "Selsyn 1 Red LED",
"green_led_entity_name": "Selsyn 1 Green LED", "green_led_entity_name": "Selsyn 1 Green LED",
"backlight_entity_name": "Selsyn 1 Backlight", "backlight_entity_name": "Selsyn 1 Backlight",
"backlight_unit": "%" "ws2812_order": "GRB"
} }

View File

@@ -47,6 +47,7 @@
"pin": 23, "pin": 23,
"num_leds_per_gauge": 3, "num_leds_per_gauge": 3,
"num_status_leds_per_gauge": 2 "num_status_leds_per_gauge": 2
"ws2812_order": "GRB"
}, },
"device": { "device": {

View File

@@ -75,6 +75,7 @@ class Gauge:
self._current_step = 0 self._current_step = 0
self._zeroed = False self._zeroed = False
self._last_dir = None
def zero(self): def zero(self):
overrun = _OVERRUN_STEPS overrun = _OVERRUN_STEPS
@@ -128,6 +129,20 @@ class Gauge:
self._pin_step.value(0) self._pin_step.value(0)
utime.sleep_us(delay_us) utime.sleep_us(delay_us)
def steps_toward(self, value, limit=5, deadband=0.5):
"""Return the step delta needed to move toward value, clamped to ±limit.
deadband: If error is less than this fraction of one step, return 0 to prevent
micro-corrections due to floating-point rounding. Default 0.5 means
no movement if error < half a step.
"""
target_step = self._val_to_step(value)
delta = target_step - self._current_step
deadband_steps = deadband
if abs(delta) < deadband_steps:
return 0
return max(-limit, min(limit, delta))
def get(self): def get(self):
return self._step_to_val(self._current_step) return self._step_to_val(self._current_step)

View File

@@ -1,12 +1,12 @@
""" """
gaugemqtt.py — MQTT-based gauge controller for ESP32 / MicroPython gaugemqttcontinuous.py — MQTT-based gauge controller for ESP32 / MicroPython
Deploy these files to the ESP32: Deploy these files to the ESP32:
gauge.py — stepper driver gauge_vid6008.py — stepper driver
gaugemqtt.py — this file gaugemqttcontinuous.py — this file
umqtt/simple.py — MicroPython built-in umqtt/simple.py — MicroPython built-in
umqtt/robust.py — https://raw.githubusercontent.com/micropython/micropython-lib/master/micropython/umqtt.robust/umqtt/robust.py umqtt/robust.py — https://raw.githubusercontent.com/micropython/micropython-lib/master/micropython/umqtt.robust/umqtt/robust.py
config.json — configuration (see below) config.json — configuration (see below)
MQTT topics (all prefixed with mqtt_prefix from config.json): MQTT topics (all prefixed with mqtt_prefix from config.json):
.../set ← HA publishes target value here .../set ← HA publishes target value here
@@ -23,7 +23,6 @@ Serial log format: [HH:MM:SS.mmm] LEVEL message
import network import network
import utime import utime
import ujson import ujson
import urandom
from umqtt.robust import MQTTClient from umqtt.robust import MQTTClient
from machine import Pin from machine import Pin
from neopixel import NeoPixel from neopixel import NeoPixel
@@ -82,6 +81,7 @@ _cfg = _load_config()
DEBUG = _cfg.get("debug", False) DEBUG = _cfg.get("debug", False)
_DEBUG = DEBUG _DEBUG = DEBUG
_WS2812_ORDER = _cfg.get("ws2812_order", "GRB").upper()
WIFI_SSID = _cfg["wifi_ssid"] WIFI_SSID = _cfg["wifi_ssid"]
WIFI_PASSWORD = _cfg["wifi_password"] WIFI_PASSWORD = _cfg["wifi_password"]
@@ -150,7 +150,6 @@ else:
"ws2812_green": tuple(_cfg.get("ws2812_green", [0, 255, 0])), "ws2812_green": tuple(_cfg.get("ws2812_green", [0, 255, 0])),
} }
) )
BL_UNIT = _cfg.get("backlight_unit", "%")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Gauge initialization # Gauge initialization
@@ -208,15 +207,8 @@ gauge_topics = [make_gauge_topics(MQTT_PREFIX, g["id"]) for g in gauges]
T_SET = f"{MQTT_PREFIX}/set" T_SET = f"{MQTT_PREFIX}/set"
T_STATE = f"{MQTT_PREFIX}/state"
T_STATUS = f"{MQTT_PREFIX}/status"
T_ZERO = f"{MQTT_PREFIX}/zero" T_ZERO = f"{MQTT_PREFIX}/zero"
T_DISC_GAUGE = f"homeassistant/number/{MQTT_CLIENT_ID}/config"
T_DISC_RED = f"homeassistant/switch/{MQTT_CLIENT_ID}_red/config"
T_DISC_GREEN = f"homeassistant/switch/{MQTT_CLIENT_ID}_green/config"
T_DISC_BL = f"homeassistant/light/{MQTT_CLIENT_ID}_bl/config"
_DEVICE = { _DEVICE = {
"identifiers": [MQTT_CLIENT_ID], "identifiers": [MQTT_CLIENT_ID],
@@ -230,7 +222,6 @@ _DEVICE = {
# WiFi # WiFi
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
_wifi_reconnect_delay_s = 5
_wifi_check_interval_ms = 30000 _wifi_check_interval_ms = 30000
_last_wifi_check = 0 _last_wifi_check = 0
_wifi_sta = None _wifi_sta = None
@@ -262,7 +253,7 @@ def connect_wifi(ssid, password, timeout_s=15):
def check_wifi(): def check_wifi():
global _wifi_sta, _last_wifi_check, _wifi_reconnect_delay_s global _wifi_sta, _last_wifi_check
now = utime.ticks_ms() now = utime.ticks_ms()
if utime.ticks_diff(now, _last_wifi_check) < _wifi_check_interval_ms: if utime.ticks_diff(now, _last_wifi_check) < _wifi_check_interval_ms:
return return
@@ -293,7 +284,6 @@ def check_wifi():
num_gauges = len(gauges) num_gauges = len(gauges)
leds_red = [] leds_red = []
leds_green = [] leds_green = []
leds_bl = []
for g in gauges: for g in gauges:
leds_red.append(Pin(g["red_pin"], Pin.OUT, value=0)) leds_red.append(Pin(g["red_pin"], Pin.OUT, value=0))
leds_green.append(Pin(g["green_pin"], Pin.OUT, value=0)) leds_green.append(Pin(g["green_pin"], Pin.OUT, value=0))
@@ -312,6 +302,13 @@ _bl_dirty_since = None
_BL_SAVE_DELAY_MS = 5000 _BL_SAVE_DELAY_MS = 5000
def _to_pixel(r, g, b):
"""Reorder RGB to match the WS2812 variant's byte order (GRB or RGB)."""
if _WS2812_ORDER == "GRB":
return (g, r, b)
return (r, g, b)
def _backlight_changed(gauge_idx, new_color, new_on, new_brightness): def _backlight_changed(gauge_idx, new_color, new_on, new_brightness):
return ( return (
new_color != backlight_color[gauge_idx] new_color != backlight_color[gauge_idx]
@@ -325,6 +322,19 @@ def _mark_bl_dirty():
_bl_dirty_since = utime.ticks_ms() _bl_dirty_since = utime.ticks_ms()
def _apply_backlight(gauge_idx, r, g, b, brightness):
"""Write RGB+brightness to the physical LEDs and mark dirty."""
scale = brightness / 100
leds_per_gauge = BACKLIGHT_LEDS_PER_GAUGE + STATUS_LEDS_PER_GAUGE
base_idx = gauge_idx * leds_per_gauge
pixel = _to_pixel(int(r * scale), int(g * scale), int(b * scale))
for j in range(BACKLIGHT_LEDS_PER_GAUGE):
leds_bl[base_idx + j] = pixel
_update_status_leds(gauge_idx)
leds_bl.write()
_mark_bl_dirty()
def set_backlight_color(gauge_idx, r, g, b, brightness=None): def set_backlight_color(gauge_idx, r, g, b, brightness=None):
global backlight_color, backlight_brightness, backlight_on global backlight_color, backlight_brightness, backlight_on
if brightness is None: if brightness is None:
@@ -336,15 +346,7 @@ def set_backlight_color(gauge_idx, r, g, b, brightness=None):
if brightness > 0: if brightness > 0:
backlight_brightness[gauge_idx] = brightness backlight_brightness[gauge_idx] = brightness
backlight_on[gauge_idx] = new_on backlight_on[gauge_idx] = new_on
_apply_backlight(gauge_idx, r, g, b, brightness)
scale = brightness / 100
leds_per_gauge = BACKLIGHT_LEDS_PER_GAUGE + STATUS_LEDS_PER_GAUGE
base_idx = gauge_idx * leds_per_gauge
for j in range(BACKLIGHT_LEDS_PER_GAUGE):
leds_bl[base_idx + j] = (int(g * scale), int(r * scale), int(b * scale))
_update_status_leds(gauge_idx)
leds_bl.write()
_mark_bl_dirty()
def set_backlight_brightness(gauge_idx, brightness): def set_backlight_brightness(gauge_idx, brightness):
@@ -357,14 +359,7 @@ def set_backlight_brightness(gauge_idx, brightness):
backlight_brightness[gauge_idx] = clamped backlight_brightness[gauge_idx] = clamped
backlight_on[gauge_idx] = new_on backlight_on[gauge_idx] = new_on
r, g, b = backlight_color[gauge_idx] r, g, b = backlight_color[gauge_idx]
scale = clamped / 100 _apply_backlight(gauge_idx, r, g, b, clamped)
leds_per_gauge = BACKLIGHT_LEDS_PER_GAUGE + STATUS_LEDS_PER_GAUGE
base_idx = gauge_idx * leds_per_gauge
for j in range(BACKLIGHT_LEDS_PER_GAUGE):
leds_bl[base_idx + j] = (int(g * scale), int(r * scale), int(b * scale))
_update_status_leds(gauge_idx)
leds_bl.write()
_mark_bl_dirty()
def _update_status_leds(gauge_idx): def _update_status_leds(gauge_idx):
@@ -376,12 +371,12 @@ def _update_status_leds(gauge_idx):
green_color = g_cfg["ws2812_green"] green_color = g_cfg["ws2812_green"]
if status_led_red[gauge_idx]: if status_led_red[gauge_idx]:
leds_bl[base_idx] = (red_color[1], red_color[0], red_color[2]) leds_bl[base_idx] = _to_pixel(*red_color)
else: else:
leds_bl[base_idx] = (0, 0, 0) leds_bl[base_idx] = (0, 0, 0)
if status_led_green[gauge_idx]: if status_led_green[gauge_idx]:
leds_bl[base_idx + 1] = (green_color[1], green_color[0], green_color[2]) leds_bl[base_idx + 1] = _to_pixel(*green_color)
else: else:
leds_bl[base_idx + 1] = (0, 0, 0) leds_bl[base_idx + 1] = (0, 0, 0)
@@ -445,7 +440,6 @@ def _publish(topic, payload, retain=False):
def on_message(topic, payload): def on_message(topic, payload):
global backlight_brightness, backlight_color
if client_ref is None: if client_ref is None:
return return
topic = topic.decode() topic = topic.decode()
@@ -640,16 +634,16 @@ def check_mqtt():
port=MQTT_PORT, port=MQTT_PORT,
user=MQTT_USER, user=MQTT_USER,
password=MQTT_PASSWORD, password=MQTT_PASSWORD,
keepalive=60, keepalive=30,
) )
client_ref.set_last_will(T_STATUS, b"offline", retain=True, qos=0)
client_ref.set_callback(on_message) client_ref.set_callback(on_message)
client_ref.connect() client_ref.connect()
_subscribe_all(client_ref)
_mqtt_connected = True _mqtt_connected = True
info("MQTT reconnected!") info("MQTT reconnected!")
publish_discovery(client_ref) publish_discovery(client_ref)
_subscribe_all(client_ref)
publish_state(client_ref) publish_state(client_ref)
publish_backlight_states(client_ref)
return True return True
except Exception as e2: except Exception as e2:
log_err(f"MQTT reconnect attempt {attempt + 1} failed: {e2}") log_err(f"MQTT reconnect attempt {attempt + 1} failed: {e2}")
@@ -871,12 +865,9 @@ def main():
continue continue
client_ref.check_msg() client_ref.check_msg()
_flush_backlight_state()
pending = [] pending = [g.steps_toward(gauge_targets[i],limit=50) for i, g in enumerate(gauge_objects)]
for i, g in enumerate(gauge_objects):
delta = g._val_to_step(gauge_targets[i]) - g._current_step
steps = max(-5, min(5, delta))
pending.append(steps)
moved_any = any(s != 0 for s in pending) moved_any = any(s != 0 for s in pending)
if moved_any: if moved_any:
@@ -886,7 +877,7 @@ def main():
for i, g in enumerate(gauge_objects): for i, g in enumerate(gauge_objects):
if tick < abs(pending[i]): if tick < abs(pending[i]):
g.step(1 if pending[i] > 0 else -1) g.step(1 if pending[i] > 0 else -1)
utime.sleep_us(delay_us) utime.sleep_us(delay_us)
else: else:
if was_moving: if was_moving:
publish_state(client_ref) publish_state(client_ref)

32
ota.py
View File

@@ -198,18 +198,19 @@ def _fetch_manifest():
) )
try: try:
r = urequests.get(url, headers=_headers()) r = urequests.get(url, headers=_headers())
if r.status_code == 200: try:
data = r.json() if r.status_code == 200:
r.close() data = r.json()
if data.get("content"): if data.get("content"):
import ubinascii import ubinascii
content = ubinascii.a2b_base64(data["content"]).decode() content = ubinascii.a2b_base64(data["content"]).decode()
patterns = [line.strip() for line in content.splitlines()] patterns = [line.strip() for line in content.splitlines()]
return [p for p in patterns if p and not p.startswith("#")] return [p for p in patterns if p and not p.startswith("#")]
else: else:
warn(f"Manifest not found at {OTA_MANIFEST}") warn(f"Manifest not found at {OTA_MANIFEST}")
r.close() finally:
r.close()
except Exception as e: except Exception as e:
log_err(f"Failed to fetch manifest: {e}") log_err(f"Failed to fetch manifest: {e}")
return None return None
@@ -284,13 +285,6 @@ def _save_manifest(manifest, commit_sha=None):
warn(f"Could not save manifest: {e}") warn(f"Could not save manifest: {e}")
def _wipe_manifest():
try:
os.remove(MANIFEST_FILE)
info("Manifest wiped — full re-fetch on next update")
except OSError:
pass
def _ok_flag_exists(): def _ok_flag_exists():
try: try:
@@ -349,8 +343,6 @@ def _fetch_file_list():
break break
else: else:
name = entry["name"] name = entry["name"]
if not name.endswith(".py"):
continue
for p in patterns: for p in patterns:
p = p.rstrip("/") p = p.rstrip("/")
if _match_pattern(name, p) or _match_pattern(entry["path"], p): if _match_pattern(name, p) or _match_pattern(entry["path"], p):