dev #1

Merged
adebaumann merged 9 commits from dev into main 2026-04-11 17:47:38 +00:00
2 changed files with 218 additions and 97 deletions
Showing only changes of commit 8ee2d3155d - Show all commits

50
config.multi.example.json Normal file
View File

@@ -0,0 +1,50 @@
{
"wifi_ssid": "YourWiFiSSID",
"wifi_password": "YourWiFiPassword",
"mqtt_broker": "mqtt.example.com",
"mqtt_port": 1883,
"mqtt_user": "mqtt_user",
"mqtt_password": "mqtt_password",
"mqtt_client_id": "selsyn_multi",
"mqtt_prefix": "home/panels/chernobyl1",
"gauges": [
{
"name": "Gauge 1",
"pins": [12, 13],
"mode": "stepdir",
"min": 0,
"max": 7300,
"step_us": 200,
"entity_name": "Power 1",
"unit": "W"
},
{
"name": "Gauge 2",
"pins": [14, 15],
"mode": "stepdir",
"min": 0,
"max": 100,
"step_us": 200,
"entity_name": "Temperature",
"unit": "C"
}
],
"leds": {
"red_pin": 33,
"green_pin": 32,
"backlight_pin": 23
},
"device": {
"name": "Selsyn Multi Gauge",
"model": "Chernobyl Selsyn-inspired gauge",
"manufacturer": "AdeBaumann",
"area": "Control Panels"
},
"microsteps_per_second": 600,
"heartbeat_ms": 10000,
"rezero_interval_ms": 3600000
}

View File

@@ -83,34 +83,100 @@ MQTT_USER = _cfg["mqtt_user"]
MQTT_PASSWORD = _cfg["mqtt_password"]
MQTT_CLIENT_ID = _cfg["mqtt_client_id"]
MQTT_PREFIX = _cfg["mqtt_prefix"]
GAUGE_PINS = tuple(_cfg.get("gauge_pins", [12, 13]))
GAUGE_MODE = _cfg.get("gauge_mode", "stepdir")
GAUGE_MIN = float(_cfg.get("gauge_min", 0))
GAUGE_MAX = float(_cfg.get("gauge_max", 7300))
GAUGE_STEP_US = int(_cfg.get("gauge_step_us", 200))
MICROSTEPS_PER_SECOND = 600 # microsteps per second (adjustable)
MICROSTEPS_PER_SECOND = int(_cfg.get("microsteps_per_second", 600))
HEARTBEAT_MS = int(_cfg.get("heartbeat_ms", 10000))
REZERO_INTERVAL_MS = int(_cfg.get("rezero_interval_ms", 3600000))
LED_RED_PIN = int(_cfg.get("led_red_pin", 33))
LED_GREEN_PIN = int(_cfg.get("led_green_pin", 32))
LED_BL_PIN = int(_cfg.get("led_bl_pin", 23))
DEVICE_NAME = _cfg.get("device_name", "Selsyn 1")
DEVICE_MODEL = _cfg.get("device_model", "Chernobyl Selsyn-inspired gauge")
DEVICE_MFR = _cfg.get("device_manufacturer", "AdeBaumann")
DEVICE_AREA = _cfg.get("device_area", "Control Panels")
GAUGE_ENTITY = _cfg.get("gauge_entity_name", "Selsyn 1 Power")
GAUGE_UNIT = _cfg.get("gauge_unit", "W")
RED_ENTITY = _cfg.get("red_led_entity_name", "Selsyn 1 Red LED")
GREEN_ENTITY = _cfg.get("green_led_entity_name", "Selsyn 1 Green LED")
BL_ENTITY = _cfg.get("backlight_entity_name", "Selsyn 1 Backlight")
LED_RED_PIN = int(_cfg.get("leds", {}).get("red_pin", _cfg.get("led_red_pin", 33)))
LED_GREEN_PIN = int(
_cfg.get("leds", {}).get("green_pin", _cfg.get("led_green_pin", 32))
)
LED_BL_PIN = int(_cfg.get("leds", {}).get("backlight_pin", _cfg.get("led_bl_pin", 23)))
device_cfg = _cfg.get("device", {})
DEVICE_NAME = device_cfg.get("name", _cfg.get("device_name", "Selsyn Multi"))
DEVICE_MODEL = device_cfg.get(
"model", _cfg.get("device_model", "Chernobyl Selsyn-inspired gauge")
)
DEVICE_MFR = device_cfg.get(
"manufacturer", _cfg.get("device_manufacturer", "AdeBaumann")
)
DEVICE_AREA = device_cfg.get("area", _cfg.get("device_area", "Control Panels"))
gauges = []
if "gauges" in _cfg:
for i, g in enumerate(_cfg["gauges"]):
gauges.append(
{
"id": i,
"name": g.get("name", f"Gauge {i + 1}"),
"pins": tuple(g.get("pins", [12, 13])),
"mode": g.get("mode", "stepdir"),
"min": float(g.get("min", 0)),
"max": float(g.get("max", 100)),
"step_us": int(g.get("step_us", 200)),
"entity_name": g.get("entity_name", f"Gauge {i + 1}"),
"unit": g.get("unit", ""),
}
)
else:
gauges.append(
{
"id": 0,
"name": "Gauge 1",
"pins": tuple(_cfg.get("gauge_pins", [12, 13])),
"mode": _cfg.get("gauge_mode", "stepdir"),
"min": float(_cfg.get("gauge_min", 0)),
"max": float(_cfg.get("gauge_max", 7300)),
"step_us": int(_cfg.get("gauge_step_us", 200)),
"entity_name": _cfg.get("gauge_entity_name", "Selsyn 1 Power"),
"unit": _cfg.get("gauge_unit", "W"),
}
)
BL_ENTITY = _cfg.get("backlight_entity_name", "Selsyn Backlight")
BL_UNIT = _cfg.get("backlight_unit", "%")
# ---------------------------------------------------------------------------
# Topics
# Gauge initialization
# ---------------------------------------------------------------------------
gauge_objects = []
for g in gauges:
gauge_objects.append(
Gauge(
pins=g["pins"],
mode=g["mode"],
min_val=g["min"],
max_val=g["max"],
step_us=g["step_us"],
)
)
info(
f"Gauge {g['id']}: {g['name']} pins={g['pins']} mode={g['mode']} range=[{g['min']}, {g['max']}]"
)
gauge_targets = [g["min"] for g in gauges] # target value per gauge
gauge_last_rezero = [utime.ticks_ms() for _ in gauges]
# ---------------------------------------------------------------------------
# Topics (per-gauge)
# ---------------------------------------------------------------------------
def make_gauge_topics(prefix, gauge_id):
return {
"set": f"{prefix}/gauge{gauge_id}/set",
"state": f"{prefix}/gauge{gauge_id}/state",
"status": f"{prefix}/gauge{gauge_id}/status",
"zero": f"{prefix}/gauge{gauge_id}/zero",
"disc": f"homeassistant/number/{MQTT_CLIENT_ID}_g{gauge_id}/config",
}
gauge_topics = [make_gauge_topics(MQTT_PREFIX, g["id"]) for g in gauges]
T_SET = f"{MQTT_PREFIX}/set"
T_STATE = f"{MQTT_PREFIX}/state"
T_STATUS = f"{MQTT_PREFIX}/status"
@@ -333,12 +399,29 @@ def on_message(topic, payload):
payload = payload.decode().strip()
info(f"MQTT rx {topic} {payload}")
for i, gt in enumerate(gauge_topics):
if topic == gt["zero"]:
info(f"Zero command received for gauge {i}")
gauge_objects[i].zero()
gauge_last_rezero[i] = utime.ticks_ms()
info(f"Zero complete gauge {i}")
return
if topic == gt["set"]:
g = gauges[i]
try:
gauge_targets[i] = max(g["min"], min(g["max"], float(payload)))
info(f"Gauge {i} target → {gauge_targets[i]:.1f}")
except ValueError:
warn(f"Invalid set value for gauge {i}: '{payload}'")
return
if topic == T_ZERO:
global _last_rezero_ms
info("Zero command received")
gauge.zero()
_last_rezero_ms = utime.ticks_ms()
info("Zero complete")
for i, g in enumerate(gauge_objects):
info(f"Zeroing all gauges")
g.zero()
gauge_last_rezero[i] = utime.ticks_ms()
info("All gauges zeroed")
return
if topic == T_LED_RED:
@@ -394,14 +477,6 @@ def on_message(topic, payload):
info(f"Backlight → {color_hex} @ {brightness}%")
return
if topic == T_SET:
global _target_value
try:
_target_value = max(GAUGE_MIN, min(GAUGE_MAX, float(payload)))
info(f"New target → {_target_value:.1f}")
except ValueError:
warn(f"Invalid set value: '{payload}'")
# ---------------------------------------------------------------------------
# MQTT connect + discovery
@@ -428,6 +503,9 @@ def connect_mqtt():
client.subscribe(T_LED_RED)
client.subscribe(T_LED_GREEN)
client.subscribe(T_LED_BL)
for gt in gauge_topics:
client.subscribe(gt["set"])
client.subscribe(gt["zero"])
_mqtt_connected = True
info(f"MQTT connected client_id={MQTT_CLIENT_ID}")
return client
@@ -475,6 +553,9 @@ def check_mqtt():
client_ref.subscribe(T_LED_RED)
client_ref.subscribe(T_LED_GREEN)
client_ref.subscribe(T_LED_BL)
for gt in gauge_topics:
client_ref.subscribe(gt["set"])
client_ref.subscribe(gt["zero"])
_mqtt_connected = True
info("MQTT reconnected!")
publish_discovery(client_ref)
@@ -489,36 +570,37 @@ def check_mqtt():
def publish_discovery(client):
"""Publish all HA MQTT discovery payloads using short-form keys to stay under 512 bytes."""
# Full device block only on first payload; subsequent use identifiers-only ref
"""Publish all HA MQTT discovery payloads for gauges and LEDs."""
_dev_ref = {"identifiers": [MQTT_CLIENT_ID]}
client.publish(
T_DISC_GAUGE,
ujson.dumps(
{
"name": GAUGE_ENTITY,
"unique_id": MQTT_CLIENT_ID,
"cmd_t": T_SET,
"stat_t": T_STATE,
"avty_t": T_STATUS,
"min": GAUGE_MIN,
"max": GAUGE_MAX,
"step": 1,
"unit_of_meas": GAUGE_UNIT,
"icon": "mdi:gauge",
"dev": _DEVICE,
}
),
retain=True,
)
info("Discovery: gauge")
for i, g in enumerate(gauges):
gt = gauge_topics[i]
client.publish(
gt["disc"],
ujson.dumps(
{
"name": g["entity_name"],
"unique_id": f"{MQTT_CLIENT_ID}_g{i}",
"cmd_t": gt["set"],
"stat_t": gt["state"],
"avty_t": gt["status"],
"min": g["min"],
"max": g["max"],
"step": 1,
"unit_of_meas": g["unit"],
"icon": "mdi:gauge",
"dev": _dev_ref,
}
),
retain=True,
)
info(f"Discovery: gauge {i} ({g['name']})")
client.publish(
T_DISC_RED,
ujson.dumps(
{
"name": RED_ENTITY,
"name": "Red LED",
"uniq_id": f"{MQTT_CLIENT_ID}_led_red",
"cmd_t": T_LED_RED,
"stat_t": T_LED_RED_STATE,
@@ -537,7 +619,7 @@ def publish_discovery(client):
T_DISC_GREEN,
ujson.dumps(
{
"name": GREEN_ENTITY,
"name": "Green LED",
"uniq_id": f"{MQTT_CLIENT_ID}_led_green",
"cmd_t": T_LED_GREEN,
"stat_t": T_LED_GREEN_STATE,
@@ -573,10 +655,12 @@ def publish_discovery(client):
def publish_state(client):
val = gauge.get()
client.publish(T_STATE, str(round(val, 1)), retain=True)
client.publish(T_STATUS, "online", retain=True)
info(f"State published value={val:.1f} step={gauge._current_step}")
for i, g in enumerate(gauge_objects):
gt = gauge_topics[i]
val = g.get()
client.publish(gt["state"], str(round(val, 1)), retain=True)
client.publish(gt["status"], "online", retain=True)
info(f"Gauge {i} state: {val:.1f} step={g._current_step}")
# ---------------------------------------------------------------------------
@@ -591,11 +675,12 @@ def main():
connect_wifi(WIFI_SSID, WIFI_PASSWORD)
info("Zeroing gauge on startup ...")
gauge.zero()
info("Zeroing gauges on startup ...")
for i, g in enumerate(gauge_objects):
g.zero()
info(f"Zeroed gauge {i}")
info("Zero complete")
# umqtt.robust handles reconnection automatically — just connect once
connect_mqtt()
publish_discovery(client_ref)
publish_state(client_ref)
@@ -611,13 +696,9 @@ def main():
except ImportError:
pass
global _last_rezero_ms, _bl_dirty_since
global _bl_dirty_since
last_heartbeat = utime.ticks_ms()
_last_rezero_ms = utime.ticks_ms()
target_step = gauge._val_to_step(_target_value)
# Period at which to publish state updates during movement
MOVE_STATE_INTERVAL_MS = 500
last_move_state = utime.ticks_ms()
@@ -633,45 +714,39 @@ def main():
now = utime.ticks_ms()
# Continuously move towards target at constant speed
current_target = gauge._val_to_step(_target_value)
moved = False
if current_target != gauge._current_step:
direction = 1 if current_target > gauge._current_step else -1
gauge.step(direction)
moved = True
moved_any = False
for i, g in enumerate(gauge_objects):
current_target = g._val_to_step(gauge_targets[i])
if current_target != g._current_step:
direction = 1 if current_target > g._current_step else -1
g.step(direction)
moved_any = True
# Publish state during movement at intervals
if (
moved
moved_any
and utime.ticks_diff(now, last_move_state) >= MOVE_STATE_INTERVAL_MS
):
publish_state(client_ref)
last_move_state = now
# Sleep to achieve constant speed
if moved or (
current_target == gauge._current_step
and gauge._current_step != gauge._val_to_step(_target_value)
):
if moved_any:
delay_us = 1_000_000 // MICROSTEPS_PER_SECOND
utime.sleep_us(delay_us)
# Periodic auto-rezero (disabled when interval is 0)
if (
REZERO_INTERVAL_MS > 0
and utime.ticks_diff(now, _last_rezero_ms) >= REZERO_INTERVAL_MS
and utime.ticks_diff(now, gauge_last_rezero[0]) >= REZERO_INTERVAL_MS
):
info("Auto-rezero triggered")
saved = _target_value
gauge.zero()
if saved > GAUGE_MIN:
gauge.set(saved)
for i, g in enumerate(gauge_objects):
info(f"Auto-rezero gauge {i}")
saved = gauge_targets[i]
g.zero()
if saved > gauges[i]["min"]:
g.set(saved)
gauge_last_rezero[i] = now
publish_state(client_ref)
_last_rezero_ms = now
info(f"Auto-rezero complete, restored to {saved:.1f}")
info("Auto-rezero complete")
# Retain backlight state via MQTT after settling
if (
_bl_dirty_since is not None
and utime.ticks_diff(now, _bl_dirty_since) >= _BL_SAVE_DELAY_MS
@@ -679,12 +754,8 @@ def main():
_flush_backlight(client_ref)
_bl_dirty_since = None
# Heartbeat
if utime.ticks_diff(now, last_heartbeat) >= HEARTBEAT_MS:
publish_state(client_ref)
info(
f"step={gauge._current_step} phase={gauge._phase} target_step={gauge._val_to_step(_target_value)} expected_phase={gauge._current_step % 4}"
)
last_heartbeat = now
except Exception as e: