diff --git a/CHANGELOG.md b/CHANGELOG.md index 28f3e6b..39942d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,22 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [2.5.0] - 03/2026 + +### Added + +- **`HomiePropertyAccumulator`** — new layer that handles generic Homie v5 protocol parsing (message routing, property/target storage, dirty-node tracking) with an explicit lifecycle state machine (`HomieLifecycle`), cleanly separated from SPAN-specific + snapshot construction. +- **`$target` property support** — `SpanCircuitSnapshot` gains `relay_state_target` and `priority_target` fields, surfacing the desired-vs-actual state for relay and shed-priority commands. +- **Dirty-node snapshot caching** — `HomieDeviceConsumer.build_snapshot()` tracks which nodes changed since the last build and returns a cached snapshot when nothing is dirty, reducing per-scan CPU cost on constrained hardware. + +### Changed + +- **Layered Homie consumer architecture** — `HomieDeviceConsumer` no longer handles protocol plumbing. It reads from `HomiePropertyAccumulator` via a query API (`get_prop`, `get_target`, `nodes_by_type`, etc.) and focuses solely on SPAN domain + interpretation: power sign normalization, DSM derivation, unmapped tab synthesis, and snapshot assembly. +- **`SpanMqttClient` composes both layers** — `connect()` creates an accumulator and wires it into the consumer. The public client API is unchanged. +- **Property callbacks fire only on value change** — retained messages replaying already-known values no longer trigger callback storms on MQTT reconnect. + ## [2.4.2] - 03/2026 ### Fixed diff --git a/README.md b/README.md index 52f5f91..521b40a 100644 --- a/README.md +++ b/README.md @@ -31,14 +31,18 @@ pip install span-panel-api - `httpx` — v2 authentication and detection endpoints - `paho-mqtt` — MQTT/Homie transport (real-time push) -- `pyyaml` — simulation configuration +- `pyyaml` — YAML parsing for configuration and API payloads ## Architecture ### Transport -The `SpanMqttClient` connects to the panel's MQTT broker (MQTTS or WebSocket) and subscribes to the Homie device tree. A `HomieDeviceConsumer` state machine parses incoming topic updates into typed `SpanPanelSnapshot` dataclasses. Changes are pushed to -consumers via callbacks. +The `SpanMqttClient` connects to the panel's MQTT broker (MQTTS or WebSocket) and subscribes to the Homie device tree. A two-layer architecture separates generic Homie v5 protocol handling from SPAN-specific interpretation: + +- **`HomiePropertyAccumulator`** — handles message routing, property and `$target` storage, dirty-node tracking, and an explicit lifecycle state machine (`HomieLifecycle`). Protocol-only; no SPAN domain knowledge. +- **`HomieDeviceConsumer`** — reads from the accumulator via a query API and builds typed `SpanPanelSnapshot` dataclasses. Handles power sign normalization, DSM derivation, unmapped tab synthesis, and dirty-node-aware snapshot caching. + +Changes are pushed to consumers via callbacks. Dirty-node tracking allows the snapshot builder to skip unchanged nodes, reducing per-scan CPU cost on constrained hardware. ### Event-Loop-Driven I/O (Home Assistant Compatible) @@ -71,6 +75,7 @@ The library defines three structural subtyping protocols (PEP 544) that both the | -------------------------- | ------------------------------------------------------------------------------------- | | `SpanPanelClientProtocol` | Core lifecycle: `connect`, `close`, `ping`, `get_snapshot` | | `CircuitControlProtocol` | Relay and shed-priority control: `set_circuit_relay`, `set_circuit_priority` | +| `PanelControlProtocol` | Panel-level control: `set_dominant_power_source` | | `StreamingCapableProtocol` | Push-based updates: `register_snapshot_callback`, `start_streaming`, `stop_streaming` | Integration code programs against these protocols, not transport-specific classes. @@ -82,7 +87,7 @@ All panel state is represented as immutable, frozen dataclasses: | Dataclass | Content | | --------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------- | | `SpanPanelSnapshot` | Complete panel state: power, energy, grid/DSM state, hardware status, per-leg voltages, power flows, lugs current, circuits, battery, PV, EVSE | -| `SpanCircuitSnapshot` | Per-circuit: power, energy, relay state, priority, tabs, device type, breaker rating, current | +| `SpanCircuitSnapshot` | Per-circuit: power, energy, relay state, priority, tabs, device type, breaker rating, current, `$target` pending state | | `SpanBatterySnapshot` | BESS: SoC percentage, SoE kWh, vendor/product metadata, nameplate capacity | | `SpanPVSnapshot` | PV inverter: vendor/product metadata, nameplate capacity | | `SpanEvseSnapshot` | EVSE (EV charger): status, lock state, advertised current, vendor/product/serial/version metadata | @@ -178,6 +183,40 @@ client = await create_span_client( ) ``` +### Direct Client Construction + +Consumers that manage their own registration and broker configuration can instantiate `SpanMqttClient` directly: + +```python +from span_panel_api import SpanMqttClient, MqttClientConfig + +config = MqttClientConfig( + broker_host="192.168.1.100", + username="stored-username", + password="stored-password", + mqtts_port=8883, + ws_port=9001, + wss_port=443, +) + +client = SpanMqttClient( + host="192.168.1.100", + serial_number="nj-2316-XXXX", + broker_config=config, + snapshot_interval=1.0, +) +await client.connect() +``` + +### Scan Frequency + +`set_snapshot_interval()` controls how often push-mode snapshot callbacks fire. Lower values mean lower latency; higher values reduce CPU usage on constrained hardware. Dirty-node caching (v2.5.0) further reduces per-scan cost by skipping unchanged nodes. + +```python +# Reduce snapshot frequency to every 2 seconds +client.set_snapshot_interval(2.0) +``` + ### Circuit Control ```python @@ -189,6 +228,18 @@ await client.set_circuit_relay("circuit-uuid", "CLOSED") await client.set_circuit_priority("circuit-uuid", "NEVER") ``` +### Pending-State Detection + +When the panel publishes Homie `$target` properties, `SpanCircuitSnapshot` exposes the desired state alongside the actual state: + +```python +for cid, circuit in snapshot.circuits.items(): + if circuit.relay_state_target and circuit.relay_state_target != circuit.relay_state: + print(f" {circuit.name}: relay transitioning {circuit.relay_state} → {circuit.relay_state_target}") + if circuit.priority_target and circuit.priority_target != circuit.priority: + print(f" {circuit.name}: priority pending {circuit.priority} → {circuit.priority_target}") +``` + ### API Version Detection Detect whether a panel supports v2 (unauthenticated probe): @@ -208,7 +259,11 @@ if result.status_info: Standalone async functions for v2-specific HTTP operations: ```python -from span_panel_api import register_v2, download_ca_cert, get_homie_schema, regenerate_passphrase +from span_panel_api import ( + register_v2, download_ca_cert, get_homie_schema, + regenerate_passphrase, get_v2_status, + register_fqdn, get_fqdn, delete_fqdn, +) # Register and obtain MQTT broker credentials auth = await register_v2("192.168.1.100", "my-app", passphrase="panel-passphrase") @@ -225,21 +280,29 @@ print(f"Schema hash: {schema.types_schema_hash}") # Rotate MQTT broker password (invalidates previous password) new_password = await regenerate_passphrase("192.168.1.100", token=auth.access_token) + +# Get panel status (unauthenticated) +status = await get_v2_status("192.168.1.100") +print(f"Serial: {status.serial_number}, Firmware: {status.firmware_version}") + +# FQDN management (for panel TLS certificate SAN) +await register_fqdn("192.168.1.100", "panel.local", token=auth.access_token) +fqdn = await get_fqdn("192.168.1.100", token=auth.access_token) +await delete_fqdn("192.168.1.100", token=auth.access_token) ``` ## Error Handling All exceptions inherit from `SpanPanelError`: -| Exception | Cause | -| ------------------------------ | --------------------------------------------------------- | -| `SpanPanelAuthError` | Invalid passphrase, expired token, or missing credentials | -| `SpanPanelConnectionError` | Cannot reach the panel (network/DNS) | -| `SpanPanelTimeoutError` | Request or connection timed out | -| `SpanPanelValidationError` | Data validation failure | -| `SpanPanelAPIError` | Unexpected HTTP response from v2 endpoints | -| `SpanPanelServerError` | Panel returned HTTP 500 | -| `SimulationConfigurationError` | Invalid simulation YAML configuration | +| Exception | Cause | +| -------------------------- | --------------------------------------------------------- | +| `SpanPanelAuthError` | Invalid passphrase, expired token, or missing credentials | +| `SpanPanelConnectionError` | Cannot reach the panel (network/DNS) | +| `SpanPanelTimeoutError` | Request or connection timed out | +| `SpanPanelValidationError` | Data validation failure | +| `SpanPanelAPIError` | Unexpected HTTP response from v2 endpoints | +| `SpanPanelServerError` | Panel returned HTTP 500 | ```python from span_panel_api import SpanPanelAuthError, SpanPanelConnectionError @@ -276,14 +339,14 @@ src/span_panel_api/ ├── models.py # Snapshot dataclasses (panel, circuit, battery, PV) ├── phase_validation.py # Electrical phase utilities ├── protocol.py # PEP 544 protocols + PanelCapability flags -├── simulation.py # Simulation engine (YAML-driven, snapshot-producing) └── mqtt/ ├── __init__.py + ├── accumulator.py # HomiePropertyAccumulator (Homie v5 protocol layer) ├── async_client.py # NullLock + AsyncMQTTClient (HA core pattern) ├── client.py # SpanMqttClient (all three protocols) ├── connection.py # AsyncMqttBridge (event-loop-driven, no threads) ├── const.py # MQTT/Homie constants + UUID helpers - ├── homie.py # HomieDeviceConsumer (Homie v5 state machine) + ├── homie.py # HomieDeviceConsumer (SPAN snapshot builder) └── models.py # MqttClientConfig, MqttTransport ``` diff --git a/pyproject.toml b/pyproject.toml index 3544bcb..2a9a503 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "span-panel-api" -version = "2.4.2" +version = "2.5.0" description = "A client library for SPAN Panel API" authors = [ {name = "SpanPanel"} @@ -45,7 +45,7 @@ requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] -packages = ["src/span_panel_api"] +packages = ["src/span_panel_api", "scripts"] [tool.ruff] line-length = 125 diff --git a/src/span_panel_api/__init__.py b/src/span_panel_api/__init__.py index c915e0c..073b142 100644 --- a/src/span_panel_api/__init__.py +++ b/src/span_panel_api/__init__.py @@ -37,7 +37,7 @@ V2HomieSchema, V2StatusInfo, ) -from .mqtt import MqttClientConfig, SpanMqttClient +from .mqtt import HomieLifecycle, HomiePropertyAccumulator, MqttClientConfig, SpanMqttClient from .phase_validation import ( PhaseDistribution, are_tabs_opposite_phase, @@ -54,7 +54,7 @@ StreamingCapableProtocol, ) -__version__ = "2.4.0" +__version__ = "2.5.0" # fmt: off __all__ = [ # noqa: RUF022 # Protocols @@ -90,6 +90,8 @@ "regenerate_passphrase", "register_v2", # Transport + "HomieLifecycle", + "HomiePropertyAccumulator", "MqttClientConfig", "SpanMqttClient", # Phase validation diff --git a/src/span_panel_api/models.py b/src/span_panel_api/models.py index 010ac21..4279f5c 100644 --- a/src/span_panel_api/models.py +++ b/src/span_panel_api/models.py @@ -42,6 +42,8 @@ class SpanCircuitSnapshot: relay_requester: str = "UNKNOWN" # v2 new: circuit/relay-requester energy_accum_update_time_s: int = 0 # v1: poll timestamp | v2: MQTT arrival time instant_power_update_time_s: int = 0 # v1: poll timestamp | v2: MQTT arrival time + relay_state_target: str | None = None # v2: $target for relay (desired state) + priority_target: str | None = None # v2: $target for shed-priority (desired state) @dataclass(frozen=True, slots=True) diff --git a/src/span_panel_api/mqtt/__init__.py b/src/span_panel_api/mqtt/__init__.py index 464db69..6eaebeb 100644 --- a/src/span_panel_api/mqtt/__init__.py +++ b/src/span_panel_api/mqtt/__init__.py @@ -1,5 +1,6 @@ """SPAN Panel MQTT/Homie transport.""" +from .accumulator import HomieLifecycle, HomiePropertyAccumulator from .async_client import AsyncMQTTClient from .client import SpanMqttClient from .connection import AsyncMqttBridge @@ -10,6 +11,8 @@ "AsyncMQTTClient", "AsyncMqttBridge", "HomieDeviceConsumer", + "HomieLifecycle", + "HomiePropertyAccumulator", "MqttClientConfig", "SpanMqttClient", ] diff --git a/src/span_panel_api/mqtt/accumulator.py b/src/span_panel_api/mqtt/accumulator.py new file mode 100644 index 0000000..a102ac8 --- /dev/null +++ b/src/span_panel_api/mqtt/accumulator.py @@ -0,0 +1,273 @@ +"""Homie v5 property accumulator with lifecycle state machine. + +Generic Homie protocol layer that knows nothing about SPAN-specific +concepts. Stores property values, tracks lifecycle, and exposes a +query API for higher-level consumers. +""" + +from __future__ import annotations + +from collections.abc import Callable +import enum +import json +import logging +import time + +from .const import HOMIE_STATE_DISCONNECTED, HOMIE_STATE_LOST, HOMIE_STATE_READY, TOPIC_PREFIX + +_LOGGER = logging.getLogger(__name__) + +# Callback signature: (node_id, prop_id, value, old_value) +PropertyCallback = Callable[[str, str, str, str | None], None] + + +class HomieLifecycle(enum.Enum): + """Homie device lifecycle states.""" + + DISCONNECTED = "disconnected" + CONNECTED = "connected" + DESCRIPTION_RECEIVED = "description_received" + READY = "ready" + + +class HomiePropertyAccumulator: + """Accumulate Homie v5 property values and track device lifecycle. + + Topic prefix: ``ebus/5/{serial_number}`` + + All methods must be called from the asyncio event loop thread. + """ + + def __init__(self, serial_number: str) -> None: + self._serial_number = serial_number + self._topic_prefix = f"{TOPIC_PREFIX}/{serial_number}" + + # Lifecycle + self._lifecycle = HomieLifecycle.DISCONNECTED + self._received_state_ready = False + self._received_description = False + self._ready_since: float = 0.0 + + # Property storage + self._property_values: dict[str, dict[str, str]] = {} + self._property_timestamps: dict[str, dict[str, int]] = {} + self._target_values: dict[str, dict[str, str]] = {} + + # Node type mapping from $description + self._node_types: dict[str, str] = {} + + # Dirty tracking + self._dirty_nodes: set[str] = set() + + # Callbacks + self._property_callbacks: list[PropertyCallback] = [] + + # -- Public properties --------------------------------------------------- + + @property + def serial_number(self) -> str: + """Serial number this accumulator tracks.""" + return self._serial_number + + @property + def lifecycle(self) -> HomieLifecycle: + """Current lifecycle state.""" + return self._lifecycle + + @property + def ready_since(self) -> float: + """Monotonic timestamp of the last READY transition, 0.0 if never ready.""" + return self._ready_since + + def is_ready(self) -> bool: + """True when lifecycle is READY.""" + return self._lifecycle == HomieLifecycle.READY + + # -- Message routing ----------------------------------------------------- + + def handle_message(self, topic: str, payload: str) -> None: + """Route an MQTT message to the appropriate handler.""" + prefix_with_sep = f"{self._topic_prefix}/" + if not topic.startswith(prefix_with_sep): + return + + suffix = topic[len(prefix_with_sep) :] + + if suffix == "$state": + self._handle_state(payload) + elif suffix == "$description": + self._handle_description(payload) + elif suffix.endswith("/set"): + return # ignore /set topics + elif "/" in suffix: + parts = suffix.split("/", 1) + node_id = parts[0] + prop_part = parts[1] + if prop_part.endswith("/$target"): + # Target value: {node_id}/{prop_id}/$target + prop_id = prop_part[: -len("/$target")] + self._handle_target(node_id, prop_id, payload) + else: + # Reported value: {node_id}/{prop_id} + self._handle_property(node_id, prop_part, payload) + + # -- Query API ----------------------------------------------------------- + + def get_prop(self, node_id: str, prop_id: str, default: str = "") -> str: + """Get a property's reported value.""" + return self._property_values.get(node_id, {}).get(prop_id, default) + + def get_timestamp(self, node_id: str, prop_id: str) -> int: + """Get the epoch timestamp of a property's last update.""" + return self._property_timestamps.get(node_id, {}).get(prop_id, 0) + + def get_target(self, node_id: str, prop_id: str) -> str | None: + """Get a property's target value, or None if no target set.""" + return self._target_values.get(node_id, {}).get(prop_id) + + def has_target(self, node_id: str, prop_id: str) -> bool: + """True if a target value exists for the given property.""" + return prop_id in self._target_values.get(node_id, {}) + + def find_node_by_type(self, type_str: str) -> str | None: + """Find the first node ID matching a given type string.""" + for node_id, node_type in self._node_types.items(): + if node_type == type_str: + return node_id + return None + + def nodes_by_type(self, type_str: str) -> list[str]: + """Return all node IDs matching a given type string.""" + return [nid for nid, ntype in self._node_types.items() if ntype == type_str] + + def get_node_type(self, node_id: str) -> str: + """Get the type string for a node, or empty string if unknown.""" + return self._node_types.get(node_id, "") + + def all_node_types(self) -> dict[str, str]: + """Return a copy of the node_id → type mapping.""" + return dict(self._node_types) + + def dirty_node_ids(self) -> frozenset[str]: + """Return the set of node IDs with changed properties since last mark_clean.""" + return frozenset(self._dirty_nodes) + + def mark_clean(self) -> None: + """Clear the dirty set.""" + self._dirty_nodes.clear() + + def register_property_callback(self, callback: PropertyCallback) -> Callable[[], None]: + """Register a callback fired on property value changes. + + Callback signature: (node_id, prop_id, new_value, old_value). + Only fires when the value actually changes, not on every message. + + Returns an unregister function. + """ + self._property_callbacks.append(callback) + + def unregister() -> None: + try: + self._property_callbacks.remove(callback) + except ValueError: + _LOGGER.debug("Callback already unregistered") + + return unregister + + # -- Internal handlers --------------------------------------------------- + + def _handle_state(self, payload: str) -> None: + """Handle $state topic and drive lifecycle transitions.""" + if payload == HOMIE_STATE_READY: + self._received_state_ready = True + if self._received_description: + self._transition_to_ready() + else: + # State ready but no description yet + if self._lifecycle == HomieLifecycle.DISCONNECTED: + self._lifecycle = HomieLifecycle.CONNECTED + elif payload in (HOMIE_STATE_DISCONNECTED, HOMIE_STATE_LOST): + self._lifecycle = HomieLifecycle.DISCONNECTED + self._received_state_ready = False + self._received_description = False + else: + # init, sleeping, alert, etc. — connected but not ready + if self._lifecycle == HomieLifecycle.DISCONNECTED: + self._lifecycle = HomieLifecycle.CONNECTED + self._received_state_ready = False + + _LOGGER.debug("Homie $state: %s → lifecycle=%s", payload, self._lifecycle.value) + + def _handle_description(self, payload: str) -> None: + """Parse $description JSON and extract node type mappings.""" + try: + desc = json.loads(payload) + except json.JSONDecodeError: + _LOGGER.warning("Invalid $description JSON") + return + + self._received_description = True + self._node_types.clear() + + nodes = desc.get("nodes", {}) + if isinstance(nodes, dict): + for node_id, node_def in nodes.items(): + if isinstance(node_def, dict): + node_type = node_def.get("type", "") + if isinstance(node_type, str): + self._node_types[str(node_id)] = node_type + + # Mark all known nodes dirty + self._dirty_nodes.update(self._node_types.keys()) + + _LOGGER.debug("Parsed $description with %d nodes", len(self._node_types)) + + # Lifecycle transition + if self._received_state_ready: + self._transition_to_ready() + elif self._lifecycle in (HomieLifecycle.DISCONNECTED, HomieLifecycle.CONNECTED): + self._lifecycle = HomieLifecycle.DESCRIPTION_RECEIVED + + def _handle_property(self, node_id: str, prop_id: str, value: str) -> None: + """Handle a reported property value update.""" + now_s = int(time.time()) + + if node_id not in self._property_values: + self._property_values[node_id] = {} + self._property_timestamps[node_id] = {} + + old_value = self._property_values[node_id].get(prop_id) + + if old_value == value: + return # no change — no dirty, no callbacks, no timestamp bump + + self._property_values[node_id][prop_id] = value + self._property_timestamps[node_id][prop_id] = now_s + self._dirty_nodes.add(node_id) + + self._fire_callbacks(node_id, prop_id, value, old_value) + + def _handle_target(self, node_id: str, prop_id: str, value: str) -> None: + """Handle a $target property value.""" + if node_id not in self._target_values: + self._target_values[node_id] = {} + + old_target = self._target_values[node_id].get(prop_id) + if old_target == value: + return # no change + + self._target_values[node_id][prop_id] = value + self._dirty_nodes.add(node_id) + + def _transition_to_ready(self) -> None: + """Transition lifecycle to READY.""" + self._lifecycle = HomieLifecycle.READY + self._ready_since = time.monotonic() + + def _fire_callbacks(self, node_id: str, prop_id: str, value: str, old_value: str | None) -> None: + """Fire all registered property callbacks, catching exceptions.""" + for cb in self._property_callbacks: + try: + cb(node_id, prop_id, value, old_value) + except Exception: # pylint: disable=broad-exception-caught + _LOGGER.debug("Property callback error for %s/%s", node_id, prop_id, exc_info=True) diff --git a/src/span_panel_api/mqtt/client.py b/src/span_panel_api/mqtt/client.py index 8300564..81c940b 100644 --- a/src/span_panel_api/mqtt/client.py +++ b/src/span_panel_api/mqtt/client.py @@ -15,6 +15,7 @@ from ..exceptions import SpanPanelConnectionError, SpanPanelServerError from ..models import FieldMetadata, HomieSchemaTypes, SpanPanelSnapshot from ..protocol import PanelCapability +from .accumulator import HomiePropertyAccumulator from .connection import AsyncMqttBridge from .const import MQTT_READY_TIMEOUT_S, PROPERTY_SET_TOPIC_FMT, TYPE_CORE, WILDCARD_TOPIC_FMT from .field_metadata import build_field_metadata, log_schema_drift @@ -47,6 +48,7 @@ def __init__( self._panel_http_port = panel_http_port self._bridge: AsyncMqttBridge | None = None + self._accumulator: HomiePropertyAccumulator | None = None self._homie: HomieDeviceConsumer | None = None self._streaming = False self._snapshot_callbacks: list[Callable[[SpanPanelSnapshot], Awaitable[None]]] = [] @@ -109,7 +111,8 @@ async def connect(self) -> None: # Fetch schema to determine panel size and build field metadata schema = await get_homie_schema(self._host, port=self._panel_http_port) - self._homie = HomieDeviceConsumer(self._serial_number, schema.panel_size) + self._accumulator = HomiePropertyAccumulator(self._serial_number) + self._homie = HomieDeviceConsumer(self._accumulator, schema.panel_size) # Detect schema drift from previous connection new_hash = schema.types_schema_hash @@ -186,6 +189,7 @@ async def close(self) -> None: if self._bridge is not None: await self._bridge.disconnect() self._bridge = None + self._accumulator = None async def ping(self) -> bool: """Check if MQTT connection is alive and device is ready.""" diff --git a/src/span_panel_api/mqtt/homie.py b/src/span_panel_api/mqtt/homie.py index 828e684..1c365c6 100644 --- a/src/span_panel_api/mqtt/homie.py +++ b/src/span_panel_api/mqtt/homie.py @@ -1,24 +1,22 @@ """Homie v5 device consumer for SPAN Panel. -Parses Homie device description and tracks property values from MQTT -messages. Builds transport-agnostic SpanPanelSnapshot from the -accumulated state. +Builds transport-agnostic SpanPanelSnapshot from the accumulated +Homie property state stored in a HomiePropertyAccumulator. """ from __future__ import annotations from collections.abc import Callable -import json +import dataclasses import logging import time from typing import ClassVar from ..models import SpanBatterySnapshot, SpanCircuitSnapshot, SpanEvseSnapshot, SpanPanelSnapshot, SpanPVSnapshot +from .accumulator import HomiePropertyAccumulator from .const import ( - HOMIE_STATE_READY, LUGS_DOWNSTREAM, LUGS_UPSTREAM, - TOPIC_PREFIX, TYPE_BESS, TYPE_CIRCUIT, TYPE_CORE, @@ -33,9 +31,6 @@ _LOGGER = logging.getLogger(__name__) -# Callback signature: (node_id, prop_id, value, old_value) -PropertyCallback = Callable[[str, str, str, str | None], None] - def _parse_bool(value: str) -> bool: """Parse a Homie boolean string.""" @@ -59,142 +54,111 @@ def _parse_int(value: str, default: int = 0) -> int: class HomieDeviceConsumer: - """Parse Homie device description and track property values. + """Build SPAN-specific snapshots from accumulated Homie property state. All methods must be called from the asyncio event loop thread (guaranteed by AsyncMqttBridge's call_soon_threadsafe dispatch). """ - def __init__(self, serial_number: str, panel_size: int) -> None: - self._serial_number = serial_number + def __init__(self, accumulator: HomiePropertyAccumulator, panel_size: int) -> None: + self._acc = accumulator self._panel_size = panel_size - self._topic_prefix = f"{TOPIC_PREFIX}/{serial_number}" + self._cached_snapshot: SpanPanelSnapshot | None = None - self._state: str = "" - self._description: dict[str, object] | None = None - self._property_values: dict[str, dict[str, str]] = {} - self._property_timestamps: dict[str, dict[str, int]] = {} - self._node_types: dict[str, str] = {} - self._ready_since: float = 0.0 - self._property_callbacks: list[PropertyCallback] = [] + # -- Delegation to accumulator ------------------------------------------- + # These thin wrappers allow SpanMqttClient (and legacy test code) to + # continue calling is_ready / handle_message / find_node_by_type / + # register_property_callback on the consumer directly, without requiring + # callers to hold a separate reference to the accumulator. def is_ready(self) -> bool: - """True when $state == ready and $description has been parsed.""" - return self._state == HOMIE_STATE_READY and self._description is not None + """Delegate to accumulator.is_ready().""" + return self._acc.is_ready() + + def handle_message(self, topic: str, payload: str) -> None: + """Delegate to accumulator.handle_message().""" + self._acc.handle_message(topic, payload) + + def find_node_by_type(self, type_str: str) -> str | None: + """Delegate to accumulator.find_node_by_type().""" + return self._acc.find_node_by_type(type_str) + + def register_property_callback( + self, + callback: Callable[[str, str, str, str | None], None], + ) -> Callable[[], None]: + """Delegate to accumulator.register_property_callback().""" + return self._acc.register_property_callback(callback) def circuit_nodes_missing_names(self) -> list[str]: """Return circuit-like node IDs that have no ``name`` property yet.""" missing: list[str] = [] - for node_id, node_type in self._node_types.items(): - if node_type in self._CIRCUIT_LIKE_TYPES: - name = self._property_values.get(node_id, {}).get("name") - if not name: # None or "" - missing.append(node_id) + for node_id in self._acc.nodes_by_type(TYPE_CIRCUIT): + if not self._acc.get_prop(node_id, "name"): + missing.append(node_id) return missing - def handle_message(self, topic: str, payload: str) -> None: - """Route an MQTT message to the appropriate handler.""" - if not topic.startswith(self._topic_prefix): - return + # Node types that affect panel-level snapshot fields. + # Any dirty node of these types triggers a full rebuild. + _PANEL_LEVEL_TYPES: ClassVar[frozenset[str]] = frozenset( + { + TYPE_CORE, + TYPE_LUGS, + TYPE_LUGS_UPSTREAM, + TYPE_LUGS_DOWNSTREAM, + TYPE_BESS, + TYPE_PV, + TYPE_EVSE, + TYPE_POWER_FLOWS, + } + ) - suffix = topic[len(self._topic_prefix) + 1 :] # strip prefix + "/" + def build_snapshot(self) -> SpanPanelSnapshot: + """Build a point-in-time snapshot, using cache when possible. - if suffix == "$state": - self._handle_state(payload) - elif suffix == "$description": - self._handle_description(payload) - elif "/" in suffix and not suffix.endswith("/set"): - # Property value: {node_id}/{property_id} - parts = suffix.split("/", 1) - self._handle_property(parts[0], parts[1], payload) + Must be called after accumulator is_ready() returns True. + """ + dirty = self._acc.dirty_node_ids() - def register_property_callback(self, callback: PropertyCallback) -> Callable[[], None]: - """Register callback(node_id, prop_id, value, old_value). + if not dirty and self._cached_snapshot is not None: + self._acc.mark_clean() + return self._cached_snapshot - Returns an unregister function. - """ - self._property_callbacks.append(callback) + node_types = self._acc.all_node_types() + needs_full = self._cached_snapshot is None or any( + node_types.get(nid, "") in self._PANEL_LEVEL_TYPES or nid not in node_types for nid in dirty + ) - def unregister() -> None: - try: - self._property_callbacks.remove(callback) - except ValueError: - _LOGGER.debug("Callback already unregistered") + snapshot = self._build_snapshot() if needs_full else self._rebuild_dirty_circuits(dirty) + self._cached_snapshot = snapshot + self._acc.mark_clean() + return snapshot - return unregister + def _rebuild_dirty_circuits(self, dirty: frozenset[str]) -> SpanPanelSnapshot: + """Partial rebuild — only rebuild circuits whose nodes are dirty.""" + assert self._cached_snapshot is not None + cached = self._cached_snapshot - def build_snapshot(self) -> SpanPanelSnapshot: - """Build a point-in-time snapshot from current property values. + feed_metadata = self._build_feed_metadata() + updated_circuits: dict[str, SpanCircuitSnapshot] = {} + # Keep only non-unmapped circuits from cache, rebuild dirty ones + for cid, circ in cached.circuits.items(): + if cid.startswith("unmapped_tab_"): + continue # drop old unmapped entries; will recompute below + updated_circuits[cid] = circ + for node_id in dirty: + if self._is_circuit_node(node_id): + meta = feed_metadata.get(node_id, {}) + device_type = meta.get("device_type", "circuit") + relative_position = meta.get("relative_position", "") + circuit = self._build_circuit(node_id, device_type, relative_position) + updated_circuits[circuit.circuit_id] = circuit - Must be called after is_ready() returns True. - """ - return self._build_snapshot() - - # -- Internal handlers ------------------------------------------------- - - def _handle_state(self, payload: str) -> None: - """Handle $state topic.""" - self._state = payload - if payload == HOMIE_STATE_READY and self._ready_since == 0.0: - self._ready_since = time.monotonic() - _LOGGER.debug("Homie $state: %s", payload) - - def _handle_description(self, payload: str) -> None: - """Parse $description JSON and extract node type mappings.""" - try: - desc = json.loads(payload) - except json.JSONDecodeError: - _LOGGER.warning("Invalid $description JSON") - return - - self._description = desc - self._node_types.clear() - - # Extract node types from description - nodes = desc.get("nodes", {}) - if isinstance(nodes, dict): - for node_id, node_def in nodes.items(): - if isinstance(node_def, dict): - node_type = node_def.get("type", "") - if isinstance(node_type, str): - self._node_types[str(node_id)] = node_type - - _LOGGER.debug("Parsed $description with %d nodes", len(self._node_types)) - - def _handle_property(self, node_id: str, prop_id: str, value: str) -> None: - """Handle a property value update.""" - now_s = int(time.time()) - - if node_id not in self._property_values: - self._property_values[node_id] = {} - self._property_timestamps[node_id] = {} - - old_value = self._property_values[node_id].get(prop_id) - self._property_values[node_id][prop_id] = value - self._property_timestamps[node_id][prop_id] = now_s - - for cb in self._property_callbacks: - try: - cb(node_id, prop_id, value, old_value) - except Exception: # pylint: disable=broad-exception-caught - _LOGGER.debug("Property callback error for %s/%s", node_id, prop_id) - - # -- Snapshot building -------------------------------------------------- - - def _get_prop(self, node_id: str, prop_id: str, default: str = "") -> str: - """Get a property value.""" - return self._property_values.get(node_id, {}).get(prop_id, default) - - def _get_timestamp(self, node_id: str, prop_id: str) -> int: - """Get a property timestamp.""" - return self._property_timestamps.get(node_id, {}).get(prop_id, 0) - - def find_node_by_type(self, type_string: str) -> str | None: - """Find the first node ID matching a given type.""" - for node_id, node_type in self._node_types.items(): - if node_type == type_string: - return node_id - return None + # Recompute unmapped tabs based on current circuit set + unmapped = self._build_unmapped_tabs(updated_circuits) + updated_circuits.update(unmapped) + + return dataclasses.replace(cached, circuits=updated_circuits) def _find_lugs_node(self, direction: str) -> str | None: """Find the lugs node with a specific direction. @@ -210,14 +174,16 @@ def _find_lugs_node(self, direction: str) -> str | None: } target_type = typed_map.get(direction) if target_type: - for node_id, node_type in self._node_types.items(): + node_types = self._acc.all_node_types() + for node_id, node_type in node_types.items(): if node_type == target_type: return node_id # Generic variant (single TYPE_LUGS with direction property) - for node_id, node_type in self._node_types.items(): + node_types = self._acc.all_node_types() + for node_id, node_type in node_types.items(): if node_type == TYPE_LUGS: - prop_dir = self._property_values.get(node_id, {}).get("direction", "") + prop_dir = self._acc.get_prop(node_id, "direction") if prop_dir.upper() == direction: return node_id return None @@ -237,7 +203,7 @@ def _find_lugs_node(self, direction: str) -> str | None: def _is_circuit_node(self, node_id: str) -> bool: """Check if node is a circuit device.""" - return self._node_types.get(node_id, "") in self._CIRCUIT_LIKE_TYPES + return self._acc.get_node_type(node_id) in self._CIRCUIT_LIKE_TYPES def _build_feed_metadata(self) -> dict[str, dict[str, str]]: """Build mapping of circuit node_id → metadata from PV/EVSE feed references. @@ -247,12 +213,12 @@ def _build_feed_metadata(self) -> dict[str, dict[str, str]]: - relative_position: "IN_PANEL" | "UPSTREAM" | "DOWNSTREAM" | "" """ feed_meta: dict[str, dict[str, str]] = {} - for node_id, node_type in self._node_types.items(): + for node_id, node_type in self._acc.all_node_types().items(): device_type = self._FEED_TYPE_MAP.get(node_type) if device_type: - feed_circuit = self._get_prop(node_id, "feed") + feed_circuit = self._acc.get_prop(node_id, "feed") if feed_circuit: - rel_pos = self._get_prop(node_id, "relative-position") + rel_pos = self._acc.get_prop(node_id, "relative-position") feed_meta[feed_circuit] = { "device_type": device_type, "relative_position": rel_pos.upper() if rel_pos else "", @@ -264,76 +230,80 @@ def _build_circuit(self, node_id: str, device_type: str = "circuit", relative_po circuit_id = normalize_circuit_id(node_id) # active-power is in watts; negate so positive = consumption - raw_power_w = _parse_float(self._get_prop(node_id, "active-power")) + raw_power_w = _parse_float(self._acc.get_prop(node_id, "active-power")) instant_power_w = -raw_power_w or 0.0 # Energy: exported-energy = consumption (panel exports TO circuit) - consumed_wh = _parse_float(self._get_prop(node_id, "exported-energy")) + consumed_wh = _parse_float(self._acc.get_prop(node_id, "exported-energy")) # imported-energy = production (panel imports FROM circuit) - produced_wh = _parse_float(self._get_prop(node_id, "imported-energy")) + produced_wh = _parse_float(self._acc.get_prop(node_id, "imported-energy")) # Tabs: derived from space + dipole # Dipole circuits occupy two consecutive spaces on the same bus bar # side: [space, space + 2] (odd+odd or even+even) - space_val = self._get_prop(node_id, "space") - is_dipole = _parse_bool(self._get_prop(node_id, "dipole")) + space_val = self._acc.get_prop(node_id, "space") + is_dipole = _parse_bool(self._acc.get_prop(node_id, "dipole")) tabs: list[int] = [] if space_val: space = _parse_int(space_val) tabs = [space, space + 2] if is_dipole else [space] - always_on = _parse_bool(self._get_prop(node_id, "always-on")) + always_on = _parse_bool(self._acc.get_prop(node_id, "always-on")) # Timestamps from MQTT arrival time energy_ts = max( - self._get_timestamp(node_id, "exported-energy"), - self._get_timestamp(node_id, "imported-energy"), + self._acc.get_timestamp(node_id, "exported-energy"), + self._acc.get_timestamp(node_id, "imported-energy"), ) - power_ts = self._get_timestamp(node_id, "active-power") + power_ts = self._acc.get_timestamp(node_id, "active-power") return SpanCircuitSnapshot( circuit_id=circuit_id, - name=self._get_prop(node_id, "name"), - relay_state=self._get_prop(node_id, "relay", "UNKNOWN"), + name=self._acc.get_prop(node_id, "name"), + relay_state=self._acc.get_prop(node_id, "relay", "UNKNOWN"), instant_power_w=instant_power_w, produced_energy_wh=produced_wh, consumed_energy_wh=consumed_wh, tabs=tabs, - priority=self._get_prop(node_id, "shed-priority", "UNKNOWN"), + priority=self._acc.get_prop(node_id, "shed-priority", "UNKNOWN"), is_user_controllable=not always_on, - is_sheddable=_parse_bool(self._get_prop(node_id, "sheddable")), - is_never_backup=_parse_bool(self._get_prop(node_id, "never-backup")), + is_sheddable=_parse_bool(self._acc.get_prop(node_id, "sheddable")), + is_never_backup=_parse_bool(self._acc.get_prop(node_id, "never-backup")), device_type=device_type, relative_position=relative_position, - is_240v=_parse_bool(self._get_prop(node_id, "dipole")), - current_a=_parse_float(self._get_prop(node_id, "current")) if self._get_prop(node_id, "current") else None, + is_240v=_parse_bool(self._acc.get_prop(node_id, "dipole")), + current_a=( + _parse_float(self._acc.get_prop(node_id, "current")) if self._acc.get_prop(node_id, "current") else None + ), breaker_rating_a=( - _parse_float(self._get_prop(node_id, "breaker-rating")) - if self._get_prop(node_id, "breaker-rating") + _parse_float(self._acc.get_prop(node_id, "breaker-rating")) + if self._acc.get_prop(node_id, "breaker-rating") else None ), always_on=always_on, - relay_requester=self._get_prop(node_id, "relay-requester", "UNKNOWN"), + relay_requester=self._acc.get_prop(node_id, "relay-requester", "UNKNOWN"), energy_accum_update_time_s=energy_ts, instant_power_update_time_s=power_ts, + relay_state_target=self._acc.get_target(node_id, "relay"), + priority_target=self._acc.get_target(node_id, "shed-priority"), ) def _build_battery(self) -> SpanBatterySnapshot: """Build battery snapshot from BESS node.""" - bess_node = self.find_node_by_type(TYPE_BESS) + bess_node = self._acc.find_node_by_type(TYPE_BESS) if bess_node is None: return SpanBatterySnapshot() - soc_str = self._get_prop(bess_node, "soc") - soe_str = self._get_prop(bess_node, "soe") + soc_str = self._acc.get_prop(bess_node, "soc") + soe_str = self._acc.get_prop(bess_node, "soe") - vn = self._get_prop(bess_node, "vendor-name") - pn = self._get_prop(bess_node, "product-name") - mdl = self._get_prop(bess_node, "model") - sn = self._get_prop(bess_node, "serial-number") - sw = self._get_prop(bess_node, "software-version") - nc = self._get_prop(bess_node, "nameplate-capacity") - conn = self._get_prop(bess_node, "connected") + vn = self._acc.get_prop(bess_node, "vendor-name") + pn = self._acc.get_prop(bess_node, "product-name") + mdl = self._acc.get_prop(bess_node, "model") + sn = self._acc.get_prop(bess_node, "serial-number") + sw = self._acc.get_prop(bess_node, "software-version") + nc = self._acc.get_prop(bess_node, "nameplate-capacity") + conn = self._acc.get_prop(bess_node, "connected") return SpanBatterySnapshot( soe_percentage=_parse_float(soc_str) if soc_str else None, @@ -349,15 +319,15 @@ def _build_battery(self) -> SpanBatterySnapshot: def _build_pv(self) -> SpanPVSnapshot: """Build PV snapshot from the first PV metadata node.""" - pv_node = self.find_node_by_type(TYPE_PV) + pv_node = self._acc.find_node_by_type(TYPE_PV) if pv_node is None: return SpanPVSnapshot() - vn = self._get_prop(pv_node, "vendor-name") - pn = self._get_prop(pv_node, "product-name") - nc = self._get_prop(pv_node, "nameplate-capacity") - feed = self._get_prop(pv_node, "feed") - rel_pos = self._get_prop(pv_node, "relative-position") + vn = self._acc.get_prop(pv_node, "vendor-name") + pn = self._acc.get_prop(pv_node, "product-name") + nc = self._acc.get_prop(pv_node, "nameplate-capacity") + feed = self._acc.get_prop(pv_node, "feed") + rel_pos = self._acc.get_prop(pv_node, "relative-position") return SpanPVSnapshot( vendor_name=vn if vn else None, @@ -370,24 +340,24 @@ def _build_pv(self) -> SpanPVSnapshot: def _build_evse_devices(self) -> dict[str, SpanEvseSnapshot]: """Build EVSE snapshots from all EVSE metadata nodes.""" result: dict[str, SpanEvseSnapshot] = {} - for node_id, node_type in self._node_types.items(): + for node_id, node_type in self._acc.all_node_types().items(): if node_type != TYPE_EVSE: continue - feed = self._get_prop(node_id, "feed") + feed = self._acc.get_prop(node_id, "feed") if not feed: continue - adv = self._get_prop(node_id, "advertised-current") + adv = self._acc.get_prop(node_id, "advertised-current") result[node_id] = SpanEvseSnapshot( node_id=node_id, feed_circuit_id=normalize_circuit_id(feed), - status=self._get_prop(node_id, "status") or "UNKNOWN", - lock_state=self._get_prop(node_id, "lock-state") or "UNKNOWN", + status=self._acc.get_prop(node_id, "status") or "UNKNOWN", + lock_state=self._acc.get_prop(node_id, "lock-state") or "UNKNOWN", advertised_current_a=_parse_float(adv) if adv else None, - vendor_name=self._get_prop(node_id, "vendor-name") or None, - product_name=self._get_prop(node_id, "product-name") or None, - part_number=self._get_prop(node_id, "part-number") or None, - serial_number=self._get_prop(node_id, "serial-number") or None, - software_version=self._get_prop(node_id, "software-version") or None, + vendor_name=self._acc.get_prop(node_id, "vendor-name") or None, + product_name=self._acc.get_prop(node_id, "product-name") or None, + part_number=self._acc.get_prop(node_id, "part-number") or None, + serial_number=self._acc.get_prop(node_id, "serial-number") or None, + software_version=self._acc.get_prop(node_id, "software-version") or None, ) return result @@ -401,9 +371,9 @@ def _derive_dsm_state(self, core_node: str | None, grid_power: float, power_flow 4. both grid signals zero AND DPS != GRID — islanded """ # 1. BESS grid-state is authoritative when available - bess_node = self.find_node_by_type(TYPE_BESS) + bess_node = self._acc.find_node_by_type(TYPE_BESS) if bess_node is not None: - gs = self._get_prop(bess_node, "grid-state") + gs = self._acc.get_prop(bess_node, "grid-state") if gs == "ON_GRID": return "DSM_ON_GRID" if gs == "OFF_GRID": @@ -411,7 +381,7 @@ def _derive_dsm_state(self, core_node: str | None, grid_power: float, power_flow # 2-4. Fallback heuristic using DPS and grid power signals if core_node is not None: - dps = self._get_prop(core_node, "dominant-power-source") + dps = self._acc.get_prop(core_node, "dominant-power-source") if dps == "GRID": return "DSM_ON_GRID" @@ -480,7 +450,7 @@ def _build_unmapped_tabs( def _build_snapshot(self) -> SpanPanelSnapshot: """Build full snapshot from accumulated property values.""" - core_node = self.find_node_by_type(TYPE_CORE) + core_node = self._acc.find_node_by_type(TYPE_CORE) upstream_lugs = self._find_lugs_node(LUGS_UPSTREAM) downstream_lugs = self._find_lugs_node(LUGS_DOWNSTREAM) @@ -500,32 +470,32 @@ def _build_snapshot(self) -> SpanPanelSnapshot: vendor_cloud: str | None = None if core_node is not None: - firmware = self._get_prop(core_node, "software-version") - door_state = self._get_prop(core_node, "door", "UNKNOWN") - main_relay = self._get_prop(core_node, "relay", "UNKNOWN") - eth0 = _parse_bool(self._get_prop(core_node, "ethernet")) - wlan = _parse_bool(self._get_prop(core_node, "wifi")) + firmware = self._acc.get_prop(core_node, "software-version") + door_state = self._acc.get_prop(core_node, "door", "UNKNOWN") + main_relay = self._acc.get_prop(core_node, "relay", "UNKNOWN") + eth0 = _parse_bool(self._acc.get_prop(core_node, "ethernet")) + wlan = _parse_bool(self._acc.get_prop(core_node, "wifi")) - vc = self._get_prop(core_node, "vendor-cloud") + vc = self._acc.get_prop(core_node, "vendor-cloud") wwan_connected = vc == "CONNECTED" vendor_cloud = vc if vc else None - dps = self._get_prop(core_node, "dominant-power-source") + dps = self._acc.get_prop(core_node, "dominant-power-source") dominant_power_source = dps if dps else None - gi = self._get_prop(core_node, "grid-islandable") + gi = self._acc.get_prop(core_node, "grid-islandable") grid_islandable = _parse_bool(gi) if gi else None - l1v = self._get_prop(core_node, "l1-voltage") + l1v = self._acc.get_prop(core_node, "l1-voltage") l1_voltage = _parse_float(l1v) if l1v else None - l2v = self._get_prop(core_node, "l2-voltage") + l2v = self._acc.get_prop(core_node, "l2-voltage") l2_voltage = _parse_float(l2v) if l2v else None - br = self._get_prop(core_node, "breaker-rating") + br = self._acc.get_prop(core_node, "breaker-rating") main_breaker = _parse_int(br) if br else None - ws = self._get_prop(core_node, "wifi-ssid") + ws = self._acc.get_prop(core_node, "wifi-ssid") wifi_ssid = ws if ws else None # Upstream lugs → main meter (grid connection) @@ -537,13 +507,13 @@ def _build_snapshot(self) -> SpanPanelSnapshot: upstream_l1_current: float | None = None upstream_l2_current: float | None = None if upstream_lugs is not None: - grid_power = _parse_float(self._get_prop(upstream_lugs, "active-power")) - main_consumed = _parse_float(self._get_prop(upstream_lugs, "imported-energy")) - main_produced = _parse_float(self._get_prop(upstream_lugs, "exported-energy")) + grid_power = _parse_float(self._acc.get_prop(upstream_lugs, "active-power")) + main_consumed = _parse_float(self._acc.get_prop(upstream_lugs, "imported-energy")) + main_produced = _parse_float(self._acc.get_prop(upstream_lugs, "exported-energy")) - l1_i = self._get_prop(upstream_lugs, "l1-current") + l1_i = self._acc.get_prop(upstream_lugs, "l1-current") upstream_l1_current = _parse_float(l1_i) if l1_i else None - l2_i = self._get_prop(upstream_lugs, "l2-current") + l2_i = self._acc.get_prop(upstream_lugs, "l2-current") upstream_l2_current = _parse_float(l2_i) if l2_i else None # Downstream lugs → feedthrough @@ -553,29 +523,29 @@ def _build_snapshot(self) -> SpanPanelSnapshot: downstream_l1_current: float | None = None downstream_l2_current: float | None = None if downstream_lugs is not None: - feedthrough_power = _parse_float(self._get_prop(downstream_lugs, "active-power")) - feedthrough_consumed = _parse_float(self._get_prop(downstream_lugs, "imported-energy")) - feedthrough_produced = _parse_float(self._get_prop(downstream_lugs, "exported-energy")) + feedthrough_power = _parse_float(self._acc.get_prop(downstream_lugs, "active-power")) + feedthrough_consumed = _parse_float(self._acc.get_prop(downstream_lugs, "imported-energy")) + feedthrough_produced = _parse_float(self._acc.get_prop(downstream_lugs, "exported-energy")) - dl1_i = self._get_prop(downstream_lugs, "l1-current") + dl1_i = self._acc.get_prop(downstream_lugs, "l1-current") downstream_l1_current = _parse_float(dl1_i) if dl1_i else None - dl2_i = self._get_prop(downstream_lugs, "l2-current") + dl2_i = self._acc.get_prop(downstream_lugs, "l2-current") downstream_l2_current = _parse_float(dl2_i) if dl2_i else None # Power flows - pf_node = self.find_node_by_type(TYPE_POWER_FLOWS) + pf_node = self._acc.find_node_by_type(TYPE_POWER_FLOWS) power_flow_pv: float | None = None power_flow_battery: float | None = None power_flow_grid: float | None = None power_flow_site: float | None = None if pf_node is not None: - pf_pv = self._get_prop(pf_node, "pv") + pf_pv = self._acc.get_prop(pf_node, "pv") power_flow_pv = _parse_float(pf_pv) if pf_pv else None - pf_bat = self._get_prop(pf_node, "battery") + pf_bat = self._acc.get_prop(pf_node, "battery") power_flow_battery = _parse_float(pf_bat) if pf_bat else None - pf_grid = self._get_prop(pf_node, "grid") + pf_grid = self._acc.get_prop(pf_node, "grid") power_flow_grid = _parse_float(pf_grid) if pf_grid else None - pf_site = self._get_prop(pf_node, "site") + pf_site = self._acc.get_prop(pf_node, "site") power_flow_site = _parse_float(pf_site) if pf_site else None # Build metadata annotations from PV/EVSE metadata nodes @@ -583,7 +553,7 @@ def _build_snapshot(self) -> SpanPanelSnapshot: # Circuits circuits: dict[str, SpanCircuitSnapshot] = {} - for node_id in self._node_types: + for node_id in self._acc.all_node_types(): if self._is_circuit_node(node_id): meta = feed_metadata.get(node_id, {}) device_type = meta.get("device_type", "circuit") @@ -601,10 +571,10 @@ def _build_snapshot(self) -> SpanPanelSnapshot: evse = self._build_evse_devices() # BESS grid state for v2-native field - bess_node = self.find_node_by_type(TYPE_BESS) + bess_node = self._acc.find_node_by_type(TYPE_BESS) grid_state: str | None = None if bess_node is not None: - gs = self._get_prop(bess_node, "grid-state") + gs = self._acc.get_prop(bess_node, "grid-state") grid_state = gs if gs else None # Derived state values @@ -612,10 +582,10 @@ def _build_snapshot(self) -> SpanPanelSnapshot: current_run_config = self._derive_run_config(dsm_state, grid_islandable, dominant_power_source) # Connection uptime since $state==ready - uptime = int(time.monotonic() - self._ready_since) if self._ready_since > 0.0 else 0 + uptime = int(time.monotonic() - self._acc.ready_since) if self._acc.ready_since > 0.0 else 0 return SpanPanelSnapshot( - serial_number=self._serial_number, + serial_number=self._acc.serial_number, firmware_version=firmware, main_relay_state=main_relay, instant_grid_power_w=grid_power, @@ -627,7 +597,7 @@ def _build_snapshot(self) -> SpanPanelSnapshot: dsm_state=dsm_state, current_run_config=current_run_config, door_state=door_state, - proximity_proven=self._state == HOMIE_STATE_READY, + proximity_proven=self._acc.is_ready(), uptime_s=uptime, eth0_link=eth0, wlan_link=wlan, diff --git a/tests/test_accumulator.py b/tests/test_accumulator.py new file mode 100644 index 0000000..8a08d03 --- /dev/null +++ b/tests/test_accumulator.py @@ -0,0 +1,439 @@ +"""Tests for HomiePropertyAccumulator with lifecycle management. + +Covers: +- All lifecycle transitions (description-first, state-first, disconnection, invalid JSON, wrong serial) +- ready_since set on READY transition +- Property storage and defaults +- Timestamp tracking +- Target storage separate from reported values +- /set topics ignored +- Dirty tracking: property change marks dirty, same value doesn't, target change marks dirty, + description marks all dirty, mark_clean clears +- Node queries: find_node_by_type, nodes_by_type, all_node_types +- Callbacks: fire on change only, not on same value, unregister works, exception doesn't propagate +""" + +from __future__ import annotations + +import json +import time +from unittest.mock import patch + +import pytest + +from span_panel_api.mqtt.accumulator import HomieLifecycle, HomiePropertyAccumulator +from span_panel_api.mqtt.const import TOPIC_PREFIX + +SERIAL = "nj-2316-XXXX" +PREFIX = f"{TOPIC_PREFIX}/{SERIAL}" + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _desc(nodes: dict) -> str: + return json.dumps({"nodes": nodes}) + + +SIMPLE_DESC = _desc( + { + "core": {"type": "energy.ebus.device.distribution-enclosure.core"}, + "circuit-1": {"type": "energy.ebus.device.circuit"}, + "circuit-2": {"type": "energy.ebus.device.circuit"}, + "bess-0": {"type": "energy.ebus.device.bess"}, + } +) + + +def _make_ready(acc: HomiePropertyAccumulator, description: str = SIMPLE_DESC) -> None: + """Drive accumulator to READY (description first, then state).""" + acc.handle_message(f"{PREFIX}/$description", description) + acc.handle_message(f"{PREFIX}/$state", "ready") + + +# --------------------------------------------------------------------------- +# Lifecycle: construction +# --------------------------------------------------------------------------- + + +class TestLifecycleConstruction: + def test_initial_state_is_disconnected(self): + acc = HomiePropertyAccumulator(SERIAL) + assert acc.lifecycle == HomieLifecycle.DISCONNECTED + + def test_serial_number_property(self): + acc = HomiePropertyAccumulator(SERIAL) + assert acc.serial_number == SERIAL + + def test_is_ready_false_initially(self): + acc = HomiePropertyAccumulator(SERIAL) + assert not acc.is_ready() + + def test_ready_since_zero_initially(self): + acc = HomiePropertyAccumulator(SERIAL) + assert acc.ready_since == 0.0 + + +# --------------------------------------------------------------------------- +# Lifecycle: description-first ordering +# --------------------------------------------------------------------------- + + +class TestLifecycleDescriptionFirst: + def test_description_moves_to_description_received(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/$description", SIMPLE_DESC) + assert acc.lifecycle == HomieLifecycle.DESCRIPTION_RECEIVED + + def test_state_ready_after_description_moves_to_ready(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/$description", SIMPLE_DESC) + acc.handle_message(f"{PREFIX}/$state", "ready") + assert acc.lifecycle == HomieLifecycle.READY + assert acc.is_ready() + + def test_ready_since_set_on_ready(self): + acc = HomiePropertyAccumulator(SERIAL) + before = time.monotonic() + _make_ready(acc) + after = time.monotonic() + assert before <= acc.ready_since <= after + + +# --------------------------------------------------------------------------- +# Lifecycle: state-first ordering +# --------------------------------------------------------------------------- + + +class TestLifecycleStateFirst: + def test_state_ready_before_description_stays_connected(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/$state", "ready") + assert acc.lifecycle == HomieLifecycle.CONNECTED + assert not acc.is_ready() + + def test_description_after_state_ready_moves_to_ready(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/$state", "ready") + acc.handle_message(f"{PREFIX}/$description", SIMPLE_DESC) + assert acc.lifecycle == HomieLifecycle.READY + assert acc.is_ready() + + def test_ready_since_set_when_both_arrive(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/$state", "ready") + before = time.monotonic() + acc.handle_message(f"{PREFIX}/$description", SIMPLE_DESC) + after = time.monotonic() + assert before <= acc.ready_since <= after + + +# --------------------------------------------------------------------------- +# Lifecycle: disconnection +# --------------------------------------------------------------------------- + + +class TestLifecycleDisconnection: + def test_disconnected_state_resets_to_disconnected(self): + acc = HomiePropertyAccumulator(SERIAL) + _make_ready(acc) + acc.handle_message(f"{PREFIX}/$state", "disconnected") + assert acc.lifecycle == HomieLifecycle.DISCONNECTED + assert not acc.is_ready() + + def test_lost_state_resets_to_disconnected(self): + acc = HomiePropertyAccumulator(SERIAL) + _make_ready(acc) + acc.handle_message(f"{PREFIX}/$state", "lost") + assert acc.lifecycle == HomieLifecycle.DISCONNECTED + + def test_ready_since_preserved_after_disconnect(self): + """ready_since records the last READY transition; not cleared on disconnect.""" + acc = HomiePropertyAccumulator(SERIAL) + _make_ready(acc) + rs = acc.ready_since + acc.handle_message(f"{PREFIX}/$state", "disconnected") + assert acc.ready_since == rs + + def test_init_state_moves_to_connected(self): + """$state=init is a valid non-ready state; should be CONNECTED.""" + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/$state", "init") + assert acc.lifecycle == HomieLifecycle.CONNECTED + + +# --------------------------------------------------------------------------- +# Lifecycle: invalid JSON description +# --------------------------------------------------------------------------- + + +class TestLifecycleInvalidDescription: + def test_invalid_json_stays_in_current_state(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/$state", "ready") + acc.handle_message(f"{PREFIX}/$description", "{bad json") + assert acc.lifecycle == HomieLifecycle.CONNECTED + assert not acc.is_ready() + + +# --------------------------------------------------------------------------- +# Lifecycle: wrong serial +# --------------------------------------------------------------------------- + + +class TestWrongSerial: + def test_messages_for_wrong_serial_ignored(self): + acc = HomiePropertyAccumulator(SERIAL) + other_prefix = f"{TOPIC_PREFIX}/other-serial" + acc.handle_message(f"{other_prefix}/$state", "ready") + acc.handle_message(f"{other_prefix}/$description", SIMPLE_DESC) + assert acc.lifecycle == HomieLifecycle.DISCONNECTED + + +# --------------------------------------------------------------------------- +# Property storage and defaults +# --------------------------------------------------------------------------- + + +class TestPropertyStorage: + def test_get_prop_default(self): + acc = HomiePropertyAccumulator(SERIAL) + assert acc.get_prop("node", "prop") == "" + + def test_get_prop_custom_default(self): + acc = HomiePropertyAccumulator(SERIAL) + assert acc.get_prop("node", "prop", default="X") == "X" + + def test_get_prop_after_store(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/core/name", "My Panel") + assert acc.get_prop("core", "name") == "My Panel" + + def test_property_overwrite(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/core/name", "Old") + acc.handle_message(f"{PREFIX}/core/name", "New") + assert acc.get_prop("core", "name") == "New" + + +# --------------------------------------------------------------------------- +# Timestamp tracking +# --------------------------------------------------------------------------- + + +class TestTimestampTracking: + def test_timestamp_zero_for_unknown(self): + acc = HomiePropertyAccumulator(SERIAL) + assert acc.get_timestamp("node", "prop") == 0 + + def test_timestamp_set_on_property(self): + acc = HomiePropertyAccumulator(SERIAL) + before = int(time.time()) + acc.handle_message(f"{PREFIX}/core/power", "100") + after = int(time.time()) + ts = acc.get_timestamp("core", "power") + assert before <= ts <= after + + def test_timestamp_updates_on_overwrite(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/core/power", "100") + ts1 = acc.get_timestamp("core", "power") + # same value re-sent — timestamp still updates + acc.handle_message(f"{PREFIX}/core/power", "100") + ts2 = acc.get_timestamp("core", "power") + assert ts2 >= ts1 + + +# --------------------------------------------------------------------------- +# Target storage +# --------------------------------------------------------------------------- + + +class TestTargetStorage: + def test_target_none_by_default(self): + acc = HomiePropertyAccumulator(SERIAL) + assert acc.get_target("node", "prop") is None + assert not acc.has_target("node", "prop") + + def test_target_stored_from_dollar_target(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/core/relay/$target", "OPEN") + assert acc.get_target("core", "relay") == "OPEN" + assert acc.has_target("core", "relay") + + def test_target_independent_of_reported_value(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/core/relay", "CLOSED") + acc.handle_message(f"{PREFIX}/core/relay/$target", "OPEN") + assert acc.get_prop("core", "relay") == "CLOSED" + assert acc.get_target("core", "relay") == "OPEN" + + +# --------------------------------------------------------------------------- +# /set topics ignored +# --------------------------------------------------------------------------- + + +class TestSetTopicsIgnored: + def test_set_topic_not_stored(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/core/relay/set", "OPEN") + assert acc.get_prop("core", "relay") == "" + + def test_set_topic_not_dirty(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/core/relay/set", "OPEN") + assert len(acc.dirty_node_ids()) == 0 + + +# --------------------------------------------------------------------------- +# Dirty tracking +# --------------------------------------------------------------------------- + + +class TestDirtyTracking: + def test_property_change_marks_dirty(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/core/power", "100") + assert "core" in acc.dirty_node_ids() + + def test_same_value_not_dirty(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/core/power", "100") + acc.mark_clean() + acc.handle_message(f"{PREFIX}/core/power", "100") + assert "core" not in acc.dirty_node_ids() + + def test_target_change_marks_dirty(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/core/relay/$target", "OPEN") + assert "core" in acc.dirty_node_ids() + + def test_same_target_not_dirty(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/core/relay/$target", "OPEN") + acc.mark_clean() + acc.handle_message(f"{PREFIX}/core/relay/$target", "OPEN") + assert "core" not in acc.dirty_node_ids() + + def test_description_marks_all_dirty(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/$description", SIMPLE_DESC) + dirty = acc.dirty_node_ids() + assert "core" in dirty + assert "circuit-1" in dirty + assert "circuit-2" in dirty + assert "bess-0" in dirty + + def test_mark_clean_clears(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/core/power", "100") + acc.mark_clean() + assert len(acc.dirty_node_ids()) == 0 + + def test_dirty_returns_frozenset(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/core/power", "100") + result = acc.dirty_node_ids() + assert isinstance(result, frozenset) + + +# --------------------------------------------------------------------------- +# Node queries +# --------------------------------------------------------------------------- + + +class TestNodeQueries: + def test_find_node_by_type(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/$description", SIMPLE_DESC) + assert acc.find_node_by_type("energy.ebus.device.distribution-enclosure.core") == "core" + + def test_find_node_by_type_missing(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/$description", SIMPLE_DESC) + assert acc.find_node_by_type("nonexistent") is None + + def test_nodes_by_type(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/$description", SIMPLE_DESC) + circuits = acc.nodes_by_type("energy.ebus.device.circuit") + assert sorted(circuits) == ["circuit-1", "circuit-2"] + + def test_nodes_by_type_empty(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/$description", SIMPLE_DESC) + assert acc.nodes_by_type("nonexistent") == [] + + def test_all_node_types(self): + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/$description", SIMPLE_DESC) + types = acc.all_node_types() + assert types["core"] == "energy.ebus.device.distribution-enclosure.core" + assert types["bess-0"] == "energy.ebus.device.bess" + assert len(types) == 4 + + +# --------------------------------------------------------------------------- +# Callbacks +# --------------------------------------------------------------------------- + + +class TestCallbacks: + def test_callback_fires_on_change(self): + acc = HomiePropertyAccumulator(SERIAL) + calls = [] + acc.register_property_callback(lambda n, p, v, old: calls.append((n, p, v, old))) + acc.handle_message(f"{PREFIX}/core/power", "100") + assert len(calls) == 1 + assert calls[0] == ("core", "power", "100", None) + + def test_callback_not_fired_on_same_value(self): + acc = HomiePropertyAccumulator(SERIAL) + calls = [] + acc.register_property_callback(lambda n, p, v, old: calls.append((n, p, v, old))) + acc.handle_message(f"{PREFIX}/core/power", "100") + acc.handle_message(f"{PREFIX}/core/power", "100") + assert len(calls) == 1 + + def test_callback_fires_on_value_change(self): + acc = HomiePropertyAccumulator(SERIAL) + calls = [] + acc.register_property_callback(lambda n, p, v, old: calls.append((n, p, v, old))) + acc.handle_message(f"{PREFIX}/core/power", "100") + acc.handle_message(f"{PREFIX}/core/power", "200") + assert len(calls) == 2 + assert calls[1] == ("core", "power", "200", "100") + + def test_unregister_callback(self): + acc = HomiePropertyAccumulator(SERIAL) + calls = [] + unregister = acc.register_property_callback(lambda n, p, v, old: calls.append(1)) + acc.handle_message(f"{PREFIX}/core/power", "100") + unregister() + acc.handle_message(f"{PREFIX}/core/power", "200") + assert len(calls) == 1 + + def test_callback_exception_does_not_propagate(self): + acc = HomiePropertyAccumulator(SERIAL) + + def bad_callback(n: str, p: str, v: str, old: str | None) -> None: + raise RuntimeError("boom") + + acc.register_property_callback(bad_callback) + # Should not raise + acc.handle_message(f"{PREFIX}/core/power", "100") + assert acc.get_prop("core", "power") == "100" + + def test_callback_exception_does_not_block_other_callbacks(self): + acc = HomiePropertyAccumulator(SERIAL) + calls = [] + + def bad_callback(n: str, p: str, v: str, old: str | None) -> None: + raise RuntimeError("boom") + + acc.register_property_callback(bad_callback) + acc.register_property_callback(lambda n, p, v, old: calls.append(1)) + acc.handle_message(f"{PREFIX}/core/power", "100") + assert len(calls) == 1 diff --git a/tests/test_auth_and_homie_helpers.py b/tests/test_auth_and_homie_helpers.py index 7499528..a65fb67 100644 --- a/tests/test_auth_and_homie_helpers.py +++ b/tests/test_auth_and_homie_helpers.py @@ -10,6 +10,7 @@ from span_panel_api.auth import _int, download_ca_cert, get_homie_schema from span_panel_api.exceptions import SpanPanelConnectionError, SpanPanelTimeoutError +from span_panel_api.mqtt.accumulator import HomiePropertyAccumulator from span_panel_api.mqtt.homie import HomieDeviceConsumer, _parse_int @@ -102,7 +103,8 @@ def test_invalid_with_custom_default(self) -> None: class TestHomieCallbackUnregister: def test_unregister_removes_callback(self) -> None: - consumer = HomieDeviceConsumer("test-serial", panel_size=32) + acc = HomiePropertyAccumulator("test-serial") + consumer = HomieDeviceConsumer(acc, panel_size=32) cb = AsyncMock() unregister = consumer.register_property_callback(cb) unregister() diff --git a/tests/test_mqtt_homie.py b/tests/test_mqtt_homie.py index d104f62..473a6d7 100644 --- a/tests/test_mqtt_homie.py +++ b/tests/test_mqtt_homie.py @@ -38,6 +38,7 @@ TYPE_POWER_FLOWS, TYPE_PV, ) +from span_panel_api.mqtt.accumulator import HomiePropertyAccumulator from span_panel_api.mqtt.homie import HomieDeviceConsumer from span_panel_api.mqtt.models import MqttClientConfig from span_panel_api.protocol import ( @@ -75,13 +76,17 @@ def _full_description() -> dict: } -def _build_ready_consumer(description_nodes: dict | None = None) -> HomieDeviceConsumer: - """Create a HomieDeviceConsumer in ready state with given description.""" - consumer = HomieDeviceConsumer(SERIAL, panel_size=32) - consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) +def _build_ready_consumer( + description_nodes: dict | None = None, + panel_size: int = 32, +) -> tuple[HomiePropertyAccumulator, HomieDeviceConsumer]: + """Create accumulator + consumer in ready state with given description.""" + acc = HomiePropertyAccumulator(SERIAL) + consumer = HomieDeviceConsumer(acc, panel_size=panel_size) + acc.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) nodes = description_nodes or _full_description() - consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) - return consumer + acc.handle_message(f"{PREFIX}/$description", _make_description(nodes)) + return acc, consumer # --------------------------------------------------------------------------- @@ -123,33 +128,33 @@ def test_frozen(self): class TestHomieConsumerState: def test_not_ready_initially(self): - consumer = HomieDeviceConsumer(SERIAL, panel_size=32) - assert not consumer.is_ready() + acc = HomiePropertyAccumulator(SERIAL) + assert not acc.is_ready() def test_not_ready_state_only(self): - consumer = HomieDeviceConsumer(SERIAL, panel_size=32) - consumer.handle_message(f"{PREFIX}/$state", "ready") - assert not consumer.is_ready() + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/$state", "ready") + assert not acc.is_ready() def test_not_ready_description_only(self): - consumer = HomieDeviceConsumer(SERIAL, panel_size=32) - consumer.handle_message(f"{PREFIX}/$description", _make_description(_core_description())) - assert not consumer.is_ready() + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{PREFIX}/$description", _make_description(_core_description())) + assert not acc.is_ready() def test_ready_when_both(self): - consumer = _build_ready_consumer(_core_description()) - assert consumer.is_ready() + acc, _consumer = _build_ready_consumer(_core_description()) + assert acc.is_ready() def test_ignores_other_serial(self): - consumer = HomieDeviceConsumer(SERIAL, panel_size=32) - consumer.handle_message(f"{TOPIC_PREFIX}/other-serial/$state", "ready") - assert not consumer.is_ready() + acc = HomiePropertyAccumulator(SERIAL) + acc.handle_message(f"{TOPIC_PREFIX}/other-serial/$state", "ready") + assert not acc.is_ready() def test_ignores_set_topics(self): - consumer = _build_ready_consumer() + acc, consumer = _build_ready_consumer() # /set topics should not be handled as property values circuit_node = "aabbccdd-1122-3344-5566-778899001122" - consumer.handle_message(f"{PREFIX}/{circuit_node}/relay/set", "OPEN") + acc.handle_message(f"{PREFIX}/{circuit_node}/relay/set", "OPEN") snapshot = consumer.build_snapshot() circuit = snapshot.circuits.get("aabbccdd11223344556677889900112" + "2") assert circuit is not None @@ -183,48 +188,48 @@ def test_denormalize_non_uuid(self): def test_circuit_power_negation(self): """active-power in W, negative=consumption → positive=consumption in snapshot.""" - consumer = _build_ready_consumer() + acc, consumer = _build_ready_consumer() node = "aabbccdd-1122-3344-5566-778899001122" # -150.0 W consumption → 150.0 W positive in snapshot - consumer.handle_message(f"{PREFIX}/{node}/active-power", "-150.0") + acc.handle_message(f"{PREFIX}/{node}/active-power", "-150.0") snapshot = consumer.build_snapshot() circuit = snapshot.circuits["aabbccdd112233445566778899001122"] assert circuit.instant_power_w == 150.0 def test_circuit_power_positive_generation(self): """Positive active-power (generation) → negative in snapshot.""" - consumer = _build_ready_consumer() + acc, consumer = _build_ready_consumer() node = "aabbccdd-1122-3344-5566-778899001122" - consumer.handle_message(f"{PREFIX}/{node}/active-power", "200.0") + acc.handle_message(f"{PREFIX}/{node}/active-power", "200.0") snapshot = consumer.build_snapshot() circuit = snapshot.circuits["aabbccdd112233445566778899001122"] assert circuit.instant_power_w == -200.0 def test_circuit_energy_mapping(self): """exported-energy → consumed, imported-energy → produced.""" - consumer = _build_ready_consumer() + acc, consumer = _build_ready_consumer() node = "aabbccdd-1122-3344-5566-778899001122" - consumer.handle_message(f"{PREFIX}/{node}/exported-energy", "12345.6") - consumer.handle_message(f"{PREFIX}/{node}/imported-energy", "789.0") + acc.handle_message(f"{PREFIX}/{node}/exported-energy", "12345.6") + acc.handle_message(f"{PREFIX}/{node}/imported-energy", "789.0") snapshot = consumer.build_snapshot() circuit = snapshot.circuits["aabbccdd112233445566778899001122"] assert circuit.consumed_energy_wh == 12345.6 assert circuit.produced_energy_wh == 789.0 def test_circuit_properties(self): - consumer = _build_ready_consumer() + acc, consumer = _build_ready_consumer() node = "aabbccdd-1122-3344-5566-778899001122" - consumer.handle_message(f"{PREFIX}/{node}/name", "Kitchen") - consumer.handle_message(f"{PREFIX}/{node}/relay", "CLOSED") - consumer.handle_message(f"{PREFIX}/{node}/shed-priority", "NEVER") - consumer.handle_message(f"{PREFIX}/{node}/space", "5") - consumer.handle_message(f"{PREFIX}/{node}/dipole", "true") - consumer.handle_message(f"{PREFIX}/{node}/sheddable", "true") - consumer.handle_message(f"{PREFIX}/{node}/never-backup", "false") - consumer.handle_message(f"{PREFIX}/{node}/always-on", "true") - consumer.handle_message(f"{PREFIX}/{node}/current", "15.2") - consumer.handle_message(f"{PREFIX}/{node}/breaker-rating", "20") - consumer.handle_message(f"{PREFIX}/{node}/relay-requester", "USER") + acc.handle_message(f"{PREFIX}/{node}/name", "Kitchen") + acc.handle_message(f"{PREFIX}/{node}/relay", "CLOSED") + acc.handle_message(f"{PREFIX}/{node}/shed-priority", "NEVER") + acc.handle_message(f"{PREFIX}/{node}/space", "5") + acc.handle_message(f"{PREFIX}/{node}/dipole", "true") + acc.handle_message(f"{PREFIX}/{node}/sheddable", "true") + acc.handle_message(f"{PREFIX}/{node}/never-backup", "false") + acc.handle_message(f"{PREFIX}/{node}/always-on", "true") + acc.handle_message(f"{PREFIX}/{node}/current", "15.2") + acc.handle_message(f"{PREFIX}/{node}/breaker-rating", "20") + acc.handle_message(f"{PREFIX}/{node}/relay-requester", "USER") snapshot = consumer.build_snapshot() circuit = snapshot.circuits["aabbccdd112233445566778899001122"] @@ -243,11 +248,11 @@ def test_circuit_properties(self): assert circuit.relay_requester == "USER" def test_circuit_timestamps(self): - consumer = _build_ready_consumer() + acc, consumer = _build_ready_consumer() node = "aabbccdd-1122-3344-5566-778899001122" before = int(time.time()) - consumer.handle_message(f"{PREFIX}/{node}/active-power", "-1.0") - consumer.handle_message(f"{PREFIX}/{node}/exported-energy", "100.0") + acc.handle_message(f"{PREFIX}/{node}/active-power", "-1.0") + acc.handle_message(f"{PREFIX}/{node}/exported-energy", "100.0") after = int(time.time()) snapshot = consumer.build_snapshot() @@ -258,7 +263,7 @@ def test_circuit_timestamps(self): def test_pv_metadata_node_annotates_circuit(self): """PV metadata node's feed property sets device_type and relative_position.""" circuit_uuid = "aabbccdd-1122-3344-5566-778899001122" - consumer = _build_ready_consumer( + acc, consumer = _build_ready_consumer( { "core": {"type": TYPE_CORE}, circuit_uuid: {"type": TYPE_CIRCUIT}, @@ -267,11 +272,11 @@ def test_pv_metadata_node_annotates_circuit(self): } ) # PV node references the circuit via feed and has relative-position - consumer.handle_message(f"{PREFIX}/pv/feed", circuit_uuid) - consumer.handle_message(f"{PREFIX}/pv/relative-position", "IN_PANEL") - consumer.handle_message(f"{PREFIX}/{circuit_uuid}/name", "Solar Panels") - consumer.handle_message(f"{PREFIX}/{circuit_uuid}/space", "30") - consumer.handle_message(f"{PREFIX}/{circuit_uuid}/dipole", "true") + acc.handle_message(f"{PREFIX}/pv/feed", circuit_uuid) + acc.handle_message(f"{PREFIX}/pv/relative-position", "IN_PANEL") + acc.handle_message(f"{PREFIX}/{circuit_uuid}/name", "Solar Panels") + acc.handle_message(f"{PREFIX}/{circuit_uuid}/space", "30") + acc.handle_message(f"{PREFIX}/{circuit_uuid}/dipole", "true") snapshot = consumer.build_snapshot() circuit = snapshot.circuits["aabbccdd112233445566778899001122"] @@ -284,7 +289,7 @@ def test_pv_metadata_node_annotates_circuit(self): def test_pv_downstream_has_breaker_position(self): """PV with relative-position=DOWNSTREAM indicates a breaker-connected PV.""" circuit_uuid = "aabbccdd-1122-3344-5566-778899001122" - consumer = _build_ready_consumer( + acc, consumer = _build_ready_consumer( { "core": {"type": TYPE_CORE}, circuit_uuid: {"type": TYPE_CIRCUIT}, @@ -292,10 +297,10 @@ def test_pv_downstream_has_breaker_position(self): "bess-0": {"type": TYPE_BESS}, } ) - consumer.handle_message(f"{PREFIX}/pv/feed", circuit_uuid) - consumer.handle_message(f"{PREFIX}/pv/relative-position", "DOWNSTREAM") - consumer.handle_message(f"{PREFIX}/{circuit_uuid}/name", "Solar Breaker") - consumer.handle_message(f"{PREFIX}/{circuit_uuid}/space", "15") + acc.handle_message(f"{PREFIX}/pv/feed", circuit_uuid) + acc.handle_message(f"{PREFIX}/pv/relative-position", "DOWNSTREAM") + acc.handle_message(f"{PREFIX}/{circuit_uuid}/name", "Solar Breaker") + acc.handle_message(f"{PREFIX}/{circuit_uuid}/space", "15") snapshot = consumer.build_snapshot() circuit = snapshot.circuits["aabbccdd112233445566778899001122"] @@ -304,8 +309,8 @@ def test_pv_downstream_has_breaker_position(self): def test_pv_metadata_node_excluded_from_circuits(self): """PV/EVSE metadata nodes should not appear as circuit entities.""" - consumer = _build_ready_consumer() - consumer.handle_message(f"{PREFIX}/pv/feed", "aabbccdd-1122-3344-5566-778899001122") + acc, consumer = _build_ready_consumer() + acc.handle_message(f"{PREFIX}/pv/feed", "aabbccdd-1122-3344-5566-778899001122") snapshot = consumer.build_snapshot() # The "pv" node itself should not be in circuits @@ -313,9 +318,9 @@ def test_pv_metadata_node_excluded_from_circuits(self): def test_circuit_default_device_type(self): """Regular circuits without PV/EVSE feed have device_type='circuit' and no relative_position.""" - consumer = _build_ready_consumer() + acc, consumer = _build_ready_consumer() node = "aabbccdd-1122-3344-5566-778899001122" - consumer.handle_message(f"{PREFIX}/{node}/name", "Kitchen") + acc.handle_message(f"{PREFIX}/{node}/name", "Kitchen") snapshot = consumer.build_snapshot() circuit = snapshot.circuits["aabbccdd112233445566778899001122"] @@ -330,19 +335,19 @@ def test_circuit_default_device_type(self): class TestHomieCoreNode: def test_core_properties(self): - consumer = _build_ready_consumer() - consumer.handle_message(f"{PREFIX}/core/software-version", "spanos2/r202603/05") - consumer.handle_message(f"{PREFIX}/core/door", "CLOSED") - consumer.handle_message(f"{PREFIX}/core/relay", "CLOSED") - consumer.handle_message(f"{PREFIX}/core/ethernet", "true") - consumer.handle_message(f"{PREFIX}/core/wifi", "true") - consumer.handle_message(f"{PREFIX}/core/wifi-ssid", "MyNetwork") - consumer.handle_message(f"{PREFIX}/core/vendor-cloud", "CONNECTED") - consumer.handle_message(f"{PREFIX}/core/dominant-power-source", "GRID") - consumer.handle_message(f"{PREFIX}/core/grid-islandable", "true") - consumer.handle_message(f"{PREFIX}/core/l1-voltage", "121.5") - consumer.handle_message(f"{PREFIX}/core/l2-voltage", "120.8") - consumer.handle_message(f"{PREFIX}/core/breaker-rating", "200") + acc, consumer = _build_ready_consumer() + acc.handle_message(f"{PREFIX}/core/software-version", "spanos2/r202603/05") + acc.handle_message(f"{PREFIX}/core/door", "CLOSED") + acc.handle_message(f"{PREFIX}/core/relay", "CLOSED") + acc.handle_message(f"{PREFIX}/core/ethernet", "true") + acc.handle_message(f"{PREFIX}/core/wifi", "true") + acc.handle_message(f"{PREFIX}/core/wifi-ssid", "MyNetwork") + acc.handle_message(f"{PREFIX}/core/vendor-cloud", "CONNECTED") + acc.handle_message(f"{PREFIX}/core/dominant-power-source", "GRID") + acc.handle_message(f"{PREFIX}/core/grid-islandable", "true") + acc.handle_message(f"{PREFIX}/core/l1-voltage", "121.5") + acc.handle_message(f"{PREFIX}/core/l2-voltage", "120.8") + acc.handle_message(f"{PREFIX}/core/breaker-rating", "200") snapshot = consumer.build_snapshot() assert snapshot.firmware_version == "spanos2/r202603/05" @@ -360,12 +365,12 @@ def test_core_properties(self): assert snapshot.main_breaker_rating_a == 200 def test_proximity_proven_when_ready(self): - consumer = _build_ready_consumer() + _acc, consumer = _build_ready_consumer() snapshot = consumer.build_snapshot() assert snapshot.proximity_proven is True def test_uptime_increases(self): - consumer = _build_ready_consumer() + _acc, consumer = _build_ready_consumer() snapshot1 = consumer.build_snapshot() # uptime should be >= 0 assert snapshot1.uptime_s >= 0 @@ -382,15 +387,15 @@ class TestHomieDsmDerivation: # -- dsm_state: BESS authoritative -- def test_bess_on_grid_authoritative(self): - consumer = _build_ready_consumer() - consumer.handle_message(f"{PREFIX}/bess-0/grid-state", "ON_GRID") + acc, consumer = _build_ready_consumer() + acc.handle_message(f"{PREFIX}/bess-0/grid-state", "ON_GRID") snapshot = consumer.build_snapshot() assert snapshot.dsm_state == "DSM_ON_GRID" assert snapshot.grid_state == "ON_GRID" def test_bess_off_grid_authoritative(self): - consumer = _build_ready_consumer() - consumer.handle_message(f"{PREFIX}/bess-0/grid-state", "OFF_GRID") + acc, consumer = _build_ready_consumer() + acc.handle_message(f"{PREFIX}/bess-0/grid-state", "OFF_GRID") snapshot = consumer.build_snapshot() assert snapshot.dsm_state == "DSM_OFF_GRID" @@ -398,75 +403,75 @@ def test_bess_off_grid_authoritative(self): def test_dps_grid_implies_on_grid(self): """DPS=GRID → DSM_ON_GRID even without BESS.""" - consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}}) - consumer.handle_message(f"{PREFIX}/core/dominant-power-source", "GRID") + acc, consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}}) + acc.handle_message(f"{PREFIX}/core/dominant-power-source", "GRID") snapshot = consumer.build_snapshot() assert snapshot.dsm_state == "DSM_ON_GRID" def test_dps_battery_with_grid_power_on_grid(self): """DPS=BATTERY but grid still exchanging power → DSM_ON_GRID.""" - consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}, "lugs-upstream": {"type": TYPE_LUGS_UPSTREAM}}) - consumer.handle_message(f"{PREFIX}/core/dominant-power-source", "BATTERY") - consumer.handle_message(f"{PREFIX}/lugs-upstream/active-power", "500.0") + acc, consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}, "lugs-upstream": {"type": TYPE_LUGS_UPSTREAM}}) + acc.handle_message(f"{PREFIX}/core/dominant-power-source", "BATTERY") + acc.handle_message(f"{PREFIX}/lugs-upstream/active-power", "500.0") snapshot = consumer.build_snapshot() assert snapshot.dsm_state == "DSM_ON_GRID" def test_dps_battery_zero_lugs_nonzero_power_flow_on_grid(self): """DPS=BATTERY, zero lugs but power-flows/grid non-zero → DSM_ON_GRID.""" - consumer = _build_ready_consumer( + acc, consumer = _build_ready_consumer( { "core": {"type": TYPE_CORE}, "lugs-upstream": {"type": TYPE_LUGS_UPSTREAM}, "power-flows": {"type": TYPE_POWER_FLOWS}, } ) - consumer.handle_message(f"{PREFIX}/core/dominant-power-source", "BATTERY") - consumer.handle_message(f"{PREFIX}/lugs-upstream/active-power", "0.0") - consumer.handle_message(f"{PREFIX}/power-flows/grid", "-5.0") + acc.handle_message(f"{PREFIX}/core/dominant-power-source", "BATTERY") + acc.handle_message(f"{PREFIX}/lugs-upstream/active-power", "0.0") + acc.handle_message(f"{PREFIX}/power-flows/grid", "-5.0") snapshot = consumer.build_snapshot() assert snapshot.dsm_state == "DSM_ON_GRID" def test_dps_battery_zero_both_grid_signals_off_grid(self): """DPS=BATTERY, both lugs and power-flows/grid zero → DSM_OFF_GRID.""" - consumer = _build_ready_consumer( + acc, consumer = _build_ready_consumer( { "core": {"type": TYPE_CORE}, "lugs-upstream": {"type": TYPE_LUGS_UPSTREAM}, "power-flows": {"type": TYPE_POWER_FLOWS}, } ) - consumer.handle_message(f"{PREFIX}/core/dominant-power-source", "BATTERY") - consumer.handle_message(f"{PREFIX}/lugs-upstream/active-power", "0.0") - consumer.handle_message(f"{PREFIX}/power-flows/grid", "0.0") + acc.handle_message(f"{PREFIX}/core/dominant-power-source", "BATTERY") + acc.handle_message(f"{PREFIX}/lugs-upstream/active-power", "0.0") + acc.handle_message(f"{PREFIX}/power-flows/grid", "0.0") snapshot = consumer.build_snapshot() assert snapshot.dsm_state == "DSM_OFF_GRID" def test_dps_battery_zero_grid_power_off_grid(self): """DPS=BATTERY and zero grid power (no power-flows node) → DSM_OFF_GRID.""" - consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}, "lugs-upstream": {"type": TYPE_LUGS_UPSTREAM}}) - consumer.handle_message(f"{PREFIX}/core/dominant-power-source", "BATTERY") - consumer.handle_message(f"{PREFIX}/lugs-upstream/active-power", "0.0") + acc, consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}, "lugs-upstream": {"type": TYPE_LUGS_UPSTREAM}}) + acc.handle_message(f"{PREFIX}/core/dominant-power-source", "BATTERY") + acc.handle_message(f"{PREFIX}/lugs-upstream/active-power", "0.0") snapshot = consumer.build_snapshot() assert snapshot.dsm_state == "DSM_OFF_GRID" def test_dps_pv_with_grid_power_on_grid(self): """DPS=PV but grid still exchanging → DSM_ON_GRID.""" - consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}, "lugs-upstream": {"type": TYPE_LUGS_UPSTREAM}}) - consumer.handle_message(f"{PREFIX}/core/dominant-power-source", "PV") - consumer.handle_message(f"{PREFIX}/lugs-upstream/active-power", "-200.0") + acc, consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}, "lugs-upstream": {"type": TYPE_LUGS_UPSTREAM}}) + acc.handle_message(f"{PREFIX}/core/dominant-power-source", "PV") + acc.handle_message(f"{PREFIX}/lugs-upstream/active-power", "-200.0") snapshot = consumer.build_snapshot() assert snapshot.dsm_state == "DSM_ON_GRID" def test_dps_none_returns_unknown(self): """DPS=NONE → UNKNOWN (not a known power source).""" - consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}}) - consumer.handle_message(f"{PREFIX}/core/dominant-power-source", "NONE") + acc, consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}}) + acc.handle_message(f"{PREFIX}/core/dominant-power-source", "NONE") snapshot = consumer.build_snapshot() assert snapshot.dsm_state == "UNKNOWN" def test_no_core_returns_unknown(self): """No core node at all → UNKNOWN.""" - consumer = _build_ready_consumer({"bess-0": {"type": TYPE_BESS}}) + _acc, consumer = _build_ready_consumer({"bess-0": {"type": TYPE_BESS}}) snapshot = consumer.build_snapshot() assert snapshot.dsm_state == "UNKNOWN" @@ -474,55 +479,55 @@ def test_no_core_returns_unknown(self): def test_on_grid_dps_grid(self): """DPS=GRID → PANEL_ON_GRID.""" - consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}}) - consumer.handle_message(f"{PREFIX}/core/dominant-power-source", "GRID") + acc, consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}}) + acc.handle_message(f"{PREFIX}/core/dominant-power-source", "GRID") snapshot = consumer.build_snapshot() assert snapshot.current_run_config == "PANEL_ON_GRID" def test_off_grid_battery_islandable_backup(self): """Off-grid + islandable + BATTERY → PANEL_BACKUP.""" - consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}, "bess-0": {"type": TYPE_BESS}}) - consumer.handle_message(f"{PREFIX}/core/dominant-power-source", "BATTERY") - consumer.handle_message(f"{PREFIX}/core/grid-islandable", "true") - consumer.handle_message(f"{PREFIX}/bess-0/grid-state", "OFF_GRID") + acc, consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}, "bess-0": {"type": TYPE_BESS}}) + acc.handle_message(f"{PREFIX}/core/dominant-power-source", "BATTERY") + acc.handle_message(f"{PREFIX}/core/grid-islandable", "true") + acc.handle_message(f"{PREFIX}/bess-0/grid-state", "OFF_GRID") snapshot = consumer.build_snapshot() assert snapshot.dsm_state == "DSM_OFF_GRID" assert snapshot.current_run_config == "PANEL_BACKUP" def test_off_grid_pv_islandable_off_grid(self): """Off-grid + islandable + PV → PANEL_OFF_GRID (intentional off-grid).""" - consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}, "bess-0": {"type": TYPE_BESS}}) - consumer.handle_message(f"{PREFIX}/core/dominant-power-source", "PV") - consumer.handle_message(f"{PREFIX}/core/grid-islandable", "true") - consumer.handle_message(f"{PREFIX}/bess-0/grid-state", "OFF_GRID") + acc, consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}, "bess-0": {"type": TYPE_BESS}}) + acc.handle_message(f"{PREFIX}/core/dominant-power-source", "PV") + acc.handle_message(f"{PREFIX}/core/grid-islandable", "true") + acc.handle_message(f"{PREFIX}/bess-0/grid-state", "OFF_GRID") snapshot = consumer.build_snapshot() assert snapshot.dsm_state == "DSM_OFF_GRID" assert snapshot.current_run_config == "PANEL_OFF_GRID" def test_off_grid_generator_islandable_off_grid(self): """Off-grid + islandable + GENERATOR → PANEL_OFF_GRID.""" - consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}, "bess-0": {"type": TYPE_BESS}}) - consumer.handle_message(f"{PREFIX}/core/dominant-power-source", "GENERATOR") - consumer.handle_message(f"{PREFIX}/core/grid-islandable", "true") - consumer.handle_message(f"{PREFIX}/bess-0/grid-state", "OFF_GRID") + acc, consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}, "bess-0": {"type": TYPE_BESS}}) + acc.handle_message(f"{PREFIX}/core/dominant-power-source", "GENERATOR") + acc.handle_message(f"{PREFIX}/core/grid-islandable", "true") + acc.handle_message(f"{PREFIX}/bess-0/grid-state", "OFF_GRID") snapshot = consumer.build_snapshot() assert snapshot.current_run_config == "PANEL_OFF_GRID" def test_off_grid_not_islandable_unknown(self): """Off-grid + not islandable → UNKNOWN (shouldn't happen).""" - consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}, "bess-0": {"type": TYPE_BESS}}) - consumer.handle_message(f"{PREFIX}/core/dominant-power-source", "BATTERY") - consumer.handle_message(f"{PREFIX}/core/grid-islandable", "false") - consumer.handle_message(f"{PREFIX}/bess-0/grid-state", "OFF_GRID") + acc, consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}, "bess-0": {"type": TYPE_BESS}}) + acc.handle_message(f"{PREFIX}/core/dominant-power-source", "BATTERY") + acc.handle_message(f"{PREFIX}/core/grid-islandable", "false") + acc.handle_message(f"{PREFIX}/bess-0/grid-state", "OFF_GRID") snapshot = consumer.build_snapshot() assert snapshot.current_run_config == "UNKNOWN" def test_off_grid_islandable_dps_none_unknown(self): """Off-grid + islandable + DPS=NONE → UNKNOWN.""" - consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}, "bess-0": {"type": TYPE_BESS}}) - consumer.handle_message(f"{PREFIX}/core/dominant-power-source", "NONE") - consumer.handle_message(f"{PREFIX}/core/grid-islandable", "true") - consumer.handle_message(f"{PREFIX}/bess-0/grid-state", "OFF_GRID") + acc, consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}, "bess-0": {"type": TYPE_BESS}}) + acc.handle_message(f"{PREFIX}/core/dominant-power-source", "NONE") + acc.handle_message(f"{PREFIX}/core/grid-islandable", "true") + acc.handle_message(f"{PREFIX}/bess-0/grid-state", "OFF_GRID") snapshot = consumer.build_snapshot() assert snapshot.current_run_config == "UNKNOWN" @@ -535,10 +540,10 @@ def test_off_grid_islandable_dps_none_unknown(self): class TestHomieLugs: def test_upstream_lugs_to_main_meter(self): """Test typed lugs (energy.ebus.device.lugs.upstream) map to main meter.""" - consumer = _build_ready_consumer() - consumer.handle_message(f"{PREFIX}/lugs-upstream/active-power", "5000.0") - consumer.handle_message(f"{PREFIX}/lugs-upstream/imported-energy", "100000.0") - consumer.handle_message(f"{PREFIX}/lugs-upstream/exported-energy", "5000.0") + acc, consumer = _build_ready_consumer() + acc.handle_message(f"{PREFIX}/lugs-upstream/active-power", "5000.0") + acc.handle_message(f"{PREFIX}/lugs-upstream/imported-energy", "100000.0") + acc.handle_message(f"{PREFIX}/lugs-upstream/exported-energy", "5000.0") snapshot = consumer.build_snapshot() assert snapshot.instant_grid_power_w == 5000.0 @@ -548,10 +553,10 @@ def test_upstream_lugs_to_main_meter(self): def test_downstream_lugs_to_feedthrough(self): """Test typed lugs (energy.ebus.device.lugs.downstream) map to feedthrough.""" - consumer = _build_ready_consumer() - consumer.handle_message(f"{PREFIX}/lugs-downstream/active-power", "1000.0") - consumer.handle_message(f"{PREFIX}/lugs-downstream/imported-energy", "50000.0") - consumer.handle_message(f"{PREFIX}/lugs-downstream/exported-energy", "1000.0") + acc, consumer = _build_ready_consumer() + acc.handle_message(f"{PREFIX}/lugs-downstream/active-power", "1000.0") + acc.handle_message(f"{PREFIX}/lugs-downstream/imported-energy", "50000.0") + acc.handle_message(f"{PREFIX}/lugs-downstream/exported-energy", "1000.0") snapshot = consumer.build_snapshot() assert snapshot.feedthrough_power_w == 1000.0 @@ -560,7 +565,7 @@ def test_downstream_lugs_to_feedthrough(self): def test_generic_lugs_with_direction_property(self): """Test fallback: generic TYPE_LUGS + direction property.""" - consumer = _build_ready_consumer( + acc, consumer = _build_ready_consumer( { "core": {"type": TYPE_CORE}, "upstream-lugs": {"type": TYPE_LUGS}, @@ -568,15 +573,15 @@ def test_generic_lugs_with_direction_property(self): "bess-0": {"type": TYPE_BESS}, } ) - consumer.handle_message(f"{PREFIX}/upstream-lugs/direction", "UPSTREAM") - consumer.handle_message(f"{PREFIX}/upstream-lugs/active-power", "800.0") - consumer.handle_message(f"{PREFIX}/upstream-lugs/imported-energy", "90000.0") - consumer.handle_message(f"{PREFIX}/upstream-lugs/exported-energy", "3000.0") + acc.handle_message(f"{PREFIX}/upstream-lugs/direction", "UPSTREAM") + acc.handle_message(f"{PREFIX}/upstream-lugs/active-power", "800.0") + acc.handle_message(f"{PREFIX}/upstream-lugs/imported-energy", "90000.0") + acc.handle_message(f"{PREFIX}/upstream-lugs/exported-energy", "3000.0") - consumer.handle_message(f"{PREFIX}/downstream-lugs/direction", "DOWNSTREAM") - consumer.handle_message(f"{PREFIX}/downstream-lugs/active-power", "200.0") - consumer.handle_message(f"{PREFIX}/downstream-lugs/imported-energy", "40000.0") - consumer.handle_message(f"{PREFIX}/downstream-lugs/exported-energy", "500.0") + acc.handle_message(f"{PREFIX}/downstream-lugs/direction", "DOWNSTREAM") + acc.handle_message(f"{PREFIX}/downstream-lugs/active-power", "200.0") + acc.handle_message(f"{PREFIX}/downstream-lugs/imported-energy", "40000.0") + acc.handle_message(f"{PREFIX}/downstream-lugs/exported-energy", "500.0") snapshot = consumer.build_snapshot() assert snapshot.instant_grid_power_w == 800.0 @@ -594,27 +599,27 @@ def test_generic_lugs_with_direction_property(self): class TestHomieBattery: def test_battery_soc_soe(self): - consumer = _build_ready_consumer() - consumer.handle_message(f"{PREFIX}/bess-0/soc", "85.5") - consumer.handle_message(f"{PREFIX}/bess-0/soe", "10.2") + acc, consumer = _build_ready_consumer() + acc.handle_message(f"{PREFIX}/bess-0/soc", "85.5") + acc.handle_message(f"{PREFIX}/bess-0/soe", "10.2") snapshot = consumer.build_snapshot() assert snapshot.battery.soe_percentage == 85.5 assert snapshot.battery.soe_kwh == 10.2 def test_no_battery_node(self): - consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}}) + acc, consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}}) snapshot = consumer.build_snapshot() assert snapshot.battery.soe_percentage is None assert snapshot.battery.soe_kwh is None def test_battery_metadata(self): """BESS metadata properties are parsed into the battery snapshot.""" - consumer = _build_ready_consumer() - consumer.handle_message(f"{PREFIX}/bess-0/soc", "85.0") - consumer.handle_message(f"{PREFIX}/bess-0/vendor-name", "Tesla") - consumer.handle_message(f"{PREFIX}/bess-0/product-name", "Powerwall 3") - consumer.handle_message(f"{PREFIX}/bess-0/nameplate-capacity", "13.5") + acc, consumer = _build_ready_consumer() + acc.handle_message(f"{PREFIX}/bess-0/soc", "85.0") + acc.handle_message(f"{PREFIX}/bess-0/vendor-name", "Tesla") + acc.handle_message(f"{PREFIX}/bess-0/product-name", "Powerwall 3") + acc.handle_message(f"{PREFIX}/bess-0/nameplate-capacity", "13.5") snapshot = consumer.build_snapshot() assert snapshot.battery.vendor_name == "Tesla" @@ -623,8 +628,8 @@ def test_battery_metadata(self): def test_battery_metadata_absent(self): """BESS node without metadata properties has None values.""" - consumer = _build_ready_consumer() - consumer.handle_message(f"{PREFIX}/bess-0/soc", "50.0") + acc, consumer = _build_ready_consumer() + acc.handle_message(f"{PREFIX}/bess-0/soc", "50.0") snapshot = consumer.build_snapshot() assert snapshot.battery.soe_percentage == 50.0 @@ -642,7 +647,7 @@ class TestHomiePVMetadata: def test_pv_metadata_parsed(self): """PV metadata properties are parsed into the pv snapshot.""" circuit_uuid = "aabbccdd-1122-3344-5566-778899001122" - consumer = _build_ready_consumer( + acc, consumer = _build_ready_consumer( { "core": {"type": TYPE_CORE}, "bess-0": {"type": TYPE_BESS}, @@ -650,13 +655,13 @@ def test_pv_metadata_parsed(self): circuit_uuid: {"type": TYPE_CIRCUIT}, } ) - consumer.handle_message(f"{PREFIX}/pv-0/vendor-name", "Enphase") - consumer.handle_message(f"{PREFIX}/pv-0/product-name", "IQ8+") - consumer.handle_message(f"{PREFIX}/pv-0/nameplate-capacity", "3960") - consumer.handle_message(f"{PREFIX}/pv-0/feed", circuit_uuid) - consumer.handle_message(f"{PREFIX}/pv-0/relative-position", "IN_PANEL") - consumer.handle_message(f"{PREFIX}/{circuit_uuid}/name", "Solar") - consumer.handle_message(f"{PREFIX}/{circuit_uuid}/space", "30") + acc.handle_message(f"{PREFIX}/pv-0/vendor-name", "Enphase") + acc.handle_message(f"{PREFIX}/pv-0/product-name", "IQ8+") + acc.handle_message(f"{PREFIX}/pv-0/nameplate-capacity", "3960") + acc.handle_message(f"{PREFIX}/pv-0/feed", circuit_uuid) + acc.handle_message(f"{PREFIX}/pv-0/relative-position", "IN_PANEL") + acc.handle_message(f"{PREFIX}/{circuit_uuid}/name", "Solar") + acc.handle_message(f"{PREFIX}/{circuit_uuid}/space", "30") snapshot = consumer.build_snapshot() assert snapshot.pv.vendor_name == "Enphase" @@ -667,7 +672,7 @@ def test_pv_metadata_parsed(self): def test_no_pv_node(self): """Without PV node, pv snapshot has None values.""" - consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}}) + acc, consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}}) snapshot = consumer.build_snapshot() assert snapshot.pv.vendor_name is None assert snapshot.pv.product_name is None @@ -677,13 +682,13 @@ def test_no_pv_node(self): def test_pv_metadata_partial(self): """PV node with only some properties populated.""" - consumer = _build_ready_consumer( + acc, consumer = _build_ready_consumer( { "core": {"type": TYPE_CORE}, "pv-0": {"type": TYPE_PV}, } ) - consumer.handle_message(f"{PREFIX}/pv-0/vendor-name", "Other") + acc.handle_message(f"{PREFIX}/pv-0/vendor-name", "Other") snapshot = consumer.build_snapshot() assert snapshot.pv.vendor_name == "Other" @@ -701,17 +706,17 @@ def test_pv_metadata_partial(self): class TestHomiePowerFlows: def test_power_flows_parsed(self): """Power-flows node properties map to snapshot fields.""" - consumer = _build_ready_consumer( + acc, consumer = _build_ready_consumer( { "core": {"type": TYPE_CORE}, "power-flows": {"type": TYPE_POWER_FLOWS}, "bess-0": {"type": TYPE_BESS}, } ) - consumer.handle_message(f"{PREFIX}/power-flows/pv", "3500.0") - consumer.handle_message(f"{PREFIX}/power-flows/battery", "-1200.0") - consumer.handle_message(f"{PREFIX}/power-flows/grid", "800.0") - consumer.handle_message(f"{PREFIX}/power-flows/site", "3100.0") + acc.handle_message(f"{PREFIX}/power-flows/pv", "3500.0") + acc.handle_message(f"{PREFIX}/power-flows/battery", "-1200.0") + acc.handle_message(f"{PREFIX}/power-flows/grid", "800.0") + acc.handle_message(f"{PREFIX}/power-flows/site", "3100.0") snapshot = consumer.build_snapshot() assert snapshot.power_flow_pv == 3500.0 @@ -721,7 +726,7 @@ def test_power_flows_parsed(self): def test_no_power_flows_node(self): """Without power-flows node, fields are None.""" - consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}}) + acc, consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}}) snapshot = consumer.build_snapshot() assert snapshot.power_flow_pv is None assert snapshot.power_flow_battery is None @@ -730,13 +735,13 @@ def test_no_power_flows_node(self): def test_partial_power_flows(self): """Only populated properties get values; others remain None.""" - consumer = _build_ready_consumer( + acc, consumer = _build_ready_consumer( { "core": {"type": TYPE_CORE}, "power-flows": {"type": TYPE_POWER_FLOWS}, } ) - consumer.handle_message(f"{PREFIX}/power-flows/battery", "500.0") + acc.handle_message(f"{PREFIX}/power-flows/battery", "500.0") snapshot = consumer.build_snapshot() assert snapshot.power_flow_battery == 500.0 @@ -753,9 +758,9 @@ def test_partial_power_flows(self): class TestHomieLugsCurrent: def test_upstream_lugs_current(self): """l1-current and l2-current from upstream lugs map to snapshot.""" - consumer = _build_ready_consumer() - consumer.handle_message(f"{PREFIX}/lugs-upstream/l1-current", "45.2") - consumer.handle_message(f"{PREFIX}/lugs-upstream/l2-current", "42.8") + acc, consumer = _build_ready_consumer() + acc.handle_message(f"{PREFIX}/lugs-upstream/l1-current", "45.2") + acc.handle_message(f"{PREFIX}/lugs-upstream/l2-current", "42.8") snapshot = consumer.build_snapshot() assert snapshot.upstream_l1_current_a == 45.2 @@ -763,7 +768,7 @@ def test_upstream_lugs_current(self): def test_no_lugs_current(self): """Without l1/l2-current, fields are None.""" - consumer = _build_ready_consumer() + acc, consumer = _build_ready_consumer() snapshot = consumer.build_snapshot() assert snapshot.upstream_l1_current_a is None assert snapshot.upstream_l2_current_a is None @@ -777,17 +782,19 @@ def test_no_lugs_current(self): class TestHomiePanelSize: def test_panel_size_from_constructor(self): """panel_size in snapshot comes from constructor argument.""" - consumer = HomieDeviceConsumer(SERIAL, panel_size=32) - consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) - consumer.handle_message(f"{PREFIX}/$description", _make_description(_core_description())) + acc_local = HomiePropertyAccumulator(SERIAL) + consumer = HomieDeviceConsumer(acc_local, panel_size=32) + acc_local.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + acc_local.handle_message(f"{PREFIX}/$description", _make_description(_core_description())) snapshot = consumer.build_snapshot() assert snapshot.panel_size == 32 def test_panel_size_40(self): """Different panel sizes are propagated correctly.""" - consumer = HomieDeviceConsumer(SERIAL, panel_size=40) - consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) - consumer.handle_message(f"{PREFIX}/$description", _make_description(_core_description())) + acc_local = HomiePropertyAccumulator(SERIAL) + consumer = HomieDeviceConsumer(acc_local, panel_size=40) + acc_local.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + acc_local.handle_message(f"{PREFIX}/$description", _make_description(_core_description())) snapshot = consumer.build_snapshot() assert snapshot.panel_size == 40 @@ -797,12 +804,13 @@ def test_unmapped_tabs_use_panel_size(self): "core": {"type": TYPE_CORE}, "aaaaaaaa-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, } - consumer = HomieDeviceConsumer(SERIAL, panel_size=8) - consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) - consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) + acc_local = HomiePropertyAccumulator(SERIAL) + consumer = HomieDeviceConsumer(acc_local, panel_size=8) + acc_local.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + acc_local.handle_message(f"{PREFIX}/$description", _make_description(nodes)) # Circuit at space 2 only — tabs 3-8 should be unmapped - consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/space", "2") - consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/dipole", "false") + acc_local.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/space", "2") + acc_local.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/dipole", "false") snapshot = consumer.build_snapshot() unmapped_ids = sorted(cid for cid in snapshot.circuits if cid.startswith("unmapped_tab_")) @@ -824,17 +832,80 @@ def test_unmapped_tabs_use_panel_size(self): class TestHomieSnapshot: def test_serial_number_preserved(self): - consumer = _build_ready_consumer() + acc, consumer = _build_ready_consumer() snapshot = consumer.build_snapshot() assert snapshot.serial_number == SERIAL def test_snapshot_immutable(self): - consumer = _build_ready_consumer() + acc, consumer = _build_ready_consumer() snapshot = consumer.build_snapshot() with pytest.raises(AttributeError): snapshot.serial_number = "other" +# --------------------------------------------------------------------------- +# HomieDeviceConsumer — snapshot caching +# --------------------------------------------------------------------------- + + +class TestSnapshotCaching: + def test_cached_snapshot_returned_when_clean(self): + acc, consumer = _build_ready_consumer() + node = "aabbccdd-1122-3344-5566-778899001122" + acc.handle_message(f"{PREFIX}/{node}/active-power", "-100.0") + snap1 = consumer.build_snapshot() + snap2 = consumer.build_snapshot() + assert snap1 is snap2 # exact same object + + def test_dirty_circuit_triggers_partial_rebuild(self): + acc, consumer = _build_ready_consumer() + node = "aabbccdd-1122-3344-5566-778899001122" + acc.handle_message(f"{PREFIX}/{node}/active-power", "-100.0") + snap1 = consumer.build_snapshot() + + acc.handle_message(f"{PREFIX}/{node}/active-power", "-200.0") + snap2 = consumer.build_snapshot() + + assert snap2 is not snap1 + circuit = snap2.circuits["aabbccdd112233445566778899001122"] + assert circuit.instant_power_w == 200.0 + assert snap2.firmware_version == snap1.firmware_version + + def test_dirty_core_triggers_full_rebuild(self): + acc, consumer = _build_ready_consumer() + acc.handle_message(f"{PREFIX}/core/software-version", "v1") + snap1 = consumer.build_snapshot() + + acc.handle_message(f"{PREFIX}/core/software-version", "v2") + snap2 = consumer.build_snapshot() + + assert snap2 is not snap1 + assert snap2.firmware_version == "v2" + + def test_target_change_marks_dirty(self): + acc, consumer = _build_ready_consumer() + node = "aabbccdd-1122-3344-5566-778899001122" + acc.handle_message(f"{PREFIX}/{node}/relay", "CLOSED") + snap1 = consumer.build_snapshot() + + acc.handle_message(f"{PREFIX}/{node}/relay/$target", "OPEN") + snap2 = consumer.build_snapshot() + + assert snap2 is not snap1 + circuit = snap2.circuits["aabbccdd112233445566778899001122"] + assert circuit.relay_state == "CLOSED" + assert circuit.relay_state_target == "OPEN" + + def test_description_change_triggers_full_rebuild(self): + acc, consumer = _build_ready_consumer() + snap1 = consumer.build_snapshot() + + # Re-sending description marks all dirty + acc.handle_message(f"{PREFIX}/$description", _make_description(_full_description())) + snap2 = consumer.build_snapshot() + assert snap2 is not snap1 + + # --------------------------------------------------------------------------- # HomieDeviceConsumer — property callbacks # --------------------------------------------------------------------------- @@ -842,39 +913,39 @@ def test_snapshot_immutable(self): class TestHomieCallbacks: def test_property_callback_fired(self): - consumer = _build_ready_consumer() + acc, consumer = _build_ready_consumer() calls = [] consumer.register_property_callback(lambda n, p, v, o: calls.append((n, p, v, o))) - consumer.handle_message(f"{PREFIX}/core/door", "OPEN") + acc.handle_message(f"{PREFIX}/core/door", "OPEN") assert len(calls) == 1 assert calls[0] == ("core", "door", "OPEN", None) def test_property_callback_with_old_value(self): - consumer = _build_ready_consumer() + acc, consumer = _build_ready_consumer() calls = [] consumer.register_property_callback(lambda n, p, v, o: calls.append((n, p, v, o))) - consumer.handle_message(f"{PREFIX}/core/door", "CLOSED") - consumer.handle_message(f"{PREFIX}/core/door", "OPEN") + acc.handle_message(f"{PREFIX}/core/door", "CLOSED") + acc.handle_message(f"{PREFIX}/core/door", "OPEN") assert calls[1] == ("core", "door", "OPEN", "CLOSED") def test_unregister_callback(self): - consumer = _build_ready_consumer() + acc, consumer = _build_ready_consumer() calls = [] unregister = consumer.register_property_callback(lambda n, p, v, o: calls.append(1)) - consumer.handle_message(f"{PREFIX}/core/door", "OPEN") + acc.handle_message(f"{PREFIX}/core/door", "OPEN") unregister() - consumer.handle_message(f"{PREFIX}/core/door", "CLOSED") + acc.handle_message(f"{PREFIX}/core/door", "CLOSED") assert len(calls) == 1 def test_callback_error_doesnt_crash(self): - consumer = _build_ready_consumer() + acc, consumer = _build_ready_consumer() def bad_cb(n, p, v, o): raise ValueError("boom") consumer.register_property_callback(bad_cb) # Should not raise - consumer.handle_message(f"{PREFIX}/core/door", "OPEN") + acc.handle_message(f"{PREFIX}/core/door", "OPEN") # --------------------------------------------------------------------------- @@ -944,7 +1015,8 @@ async def test_set_dominant_power_source_publishes(self): config = MqttClientConfig(broker_host="h", username="u", password="p") client = SpanMqttClient(host="192.168.1.1", serial_number=SERIAL, broker_config=config) - client._homie = HomieDeviceConsumer(SERIAL, panel_size=32) + client._accumulator = HomiePropertyAccumulator(SERIAL) + client._homie = HomieDeviceConsumer(client._accumulator, panel_size=32) # Populate the homie description so core node is known desc = _make_description(_core_description()) @@ -969,7 +1041,8 @@ async def test_set_dominant_power_source_no_core_node_raises(self): config = MqttClientConfig(broker_host="h", username="u", password="p") client = SpanMqttClient(host="192.168.1.1", serial_number=SERIAL, broker_config=config) - client._homie = HomieDeviceConsumer(SERIAL, panel_size=32) + client._accumulator = HomiePropertyAccumulator(SERIAL) + client._homie = HomieDeviceConsumer(client._accumulator, panel_size=32) # No description loaded — core node not found with pytest.raises(SpanPanelServerError, match="Core node not found"): @@ -988,7 +1061,8 @@ async def test_get_snapshot_returns_homie_state(self): config = MqttClientConfig(broker_host="h", username="u", password="p") client = SpanMqttClient(host="192.168.1.1", serial_number=SERIAL, broker_config=config) - client._homie = HomieDeviceConsumer(SERIAL, panel_size=32) + client._accumulator = HomiePropertyAccumulator(SERIAL) + client._homie = HomieDeviceConsumer(client._accumulator, panel_size=32) # Manually ready the homie consumer client._homie.handle_message(f"{PREFIX}/$state", "ready") @@ -1017,7 +1091,8 @@ async def test_ping_true_when_connected_and_ready(self): mock_bridge = MagicMock() mock_bridge.is_connected.return_value = True client._bridge = mock_bridge - client._homie = HomieDeviceConsumer(SERIAL, panel_size=32) + client._accumulator = HomiePropertyAccumulator(SERIAL) + client._homie = HomieDeviceConsumer(client._accumulator, panel_size=32) client._homie.handle_message(f"{PREFIX}/$state", "ready") client._homie.handle_message(f"{PREFIX}/$description", _make_description(_core_description())) @@ -1070,14 +1145,15 @@ async def test_start_stop_streaming(self): class TestHomieEdgeCases: def test_invalid_description_json(self): - consumer = HomieDeviceConsumer(SERIAL, panel_size=32) - consumer.handle_message(f"{PREFIX}/$state", "ready") - consumer.handle_message(f"{PREFIX}/$description", "not-json{{{") + acc_local = HomiePropertyAccumulator(SERIAL) + consumer = HomieDeviceConsumer(acc_local, panel_size=32) + acc_local.handle_message(f"{PREFIX}/$state", "ready") + acc_local.handle_message(f"{PREFIX}/$description", "not-json{{{") assert not consumer.is_ready() def test_empty_property_values(self): """Circuit with no properties should still build with defaults.""" - consumer = _build_ready_consumer() + acc, consumer = _build_ready_consumer() snapshot = consumer.build_snapshot() circuit = snapshot.circuits["aabbccdd112233445566778899001122"] assert circuit.name == "" @@ -1091,9 +1167,9 @@ def test_multiple_circuits(self): "aaaaaaaa-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, "bbbbbbbb-5555-6666-7777-888888888888": {"type": TYPE_CIRCUIT}, } - consumer = _build_ready_consumer(nodes) - consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/name", "Circuit A") - consumer.handle_message(f"{PREFIX}/bbbbbbbb-5555-6666-7777-888888888888/name", "Circuit B") + acc, consumer = _build_ready_consumer(nodes) + acc.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/name", "Circuit A") + acc.handle_message(f"{PREFIX}/bbbbbbbb-5555-6666-7777-888888888888/name", "Circuit B") snapshot = consumer.build_snapshot() real_circuits = {k: v for k, v in snapshot.circuits.items() if not k.startswith("unmapped_tab_")} @@ -1102,7 +1178,7 @@ def test_multiple_circuits(self): assert snapshot.circuits["bbbbbbbb55556666777788888888888" + "8"].name == "Circuit B" def test_current_and_breaker_none_when_empty(self): - consumer = _build_ready_consumer() + acc, consumer = _build_ready_consumer() snapshot = consumer.build_snapshot() circuit = snapshot.circuits["aabbccdd112233445566778899001122"] assert circuit.current_a is None @@ -1127,10 +1203,10 @@ def test_single_pole_tabs(self): "core": {"type": TYPE_CORE}, "aaaaaaaa-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, } - consumer = _build_ready_consumer(nodes) + acc, consumer = _build_ready_consumer(nodes) node = "aaaaaaaa-1111-2222-3333-444444444444" - consumer.handle_message(f"{PREFIX}/{node}/space", "3") - consumer.handle_message(f"{PREFIX}/{node}/dipole", "false") + acc.handle_message(f"{PREFIX}/{node}/space", "3") + acc.handle_message(f"{PREFIX}/{node}/dipole", "false") snapshot = consumer.build_snapshot() circuit = snapshot.circuits["aaaaaaaa111122223333444444444444"] @@ -1142,10 +1218,10 @@ def test_dipole_tabs(self): "core": {"type": TYPE_CORE}, "aaaaaaaa-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, } - consumer = _build_ready_consumer(nodes) + acc, consumer = _build_ready_consumer(nodes) node = "aaaaaaaa-1111-2222-3333-444444444444" - consumer.handle_message(f"{PREFIX}/{node}/space", "11") - consumer.handle_message(f"{PREFIX}/{node}/dipole", "true") + acc.handle_message(f"{PREFIX}/{node}/space", "11") + acc.handle_message(f"{PREFIX}/{node}/dipole", "true") snapshot = consumer.build_snapshot() circuit = snapshot.circuits["aaaaaaaa111122223333444444444444"] @@ -1157,10 +1233,10 @@ def test_dipole_even_side(self): "core": {"type": TYPE_CORE}, "aaaaaaaa-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, } - consumer = _build_ready_consumer(nodes) + acc, consumer = _build_ready_consumer(nodes) node = "aaaaaaaa-1111-2222-3333-444444444444" - consumer.handle_message(f"{PREFIX}/{node}/space", "30") - consumer.handle_message(f"{PREFIX}/{node}/dipole", "true") + acc.handle_message(f"{PREFIX}/{node}/space", "30") + acc.handle_message(f"{PREFIX}/{node}/dipole", "true") snapshot = consumer.build_snapshot() circuit = snapshot.circuits["aaaaaaaa111122223333444444444444"] @@ -1174,15 +1250,16 @@ def test_unmapped_tabs_generated(self): "bbbbbbbb-5555-6666-7777-888888888888": {"type": TYPE_CIRCUIT}, } # Use panel_size=6 so the test is tractable - consumer = HomieDeviceConsumer(SERIAL, panel_size=6) - consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) - consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) + acc_local = HomiePropertyAccumulator(SERIAL) + consumer = HomieDeviceConsumer(acc_local, panel_size=6) + acc_local.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + acc_local.handle_message(f"{PREFIX}/$description", _make_description(nodes)) # Circuit A at space 1 (single-pole) - consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/space", "1") - consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/dipole", "false") + acc_local.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/space", "1") + acc_local.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/dipole", "false") # Circuit B at space 3 (dipole → occupies 3 and 5) - consumer.handle_message(f"{PREFIX}/bbbbbbbb-5555-6666-7777-888888888888/space", "3") - consumer.handle_message(f"{PREFIX}/bbbbbbbb-5555-6666-7777-888888888888/dipole", "true") + acc_local.handle_message(f"{PREFIX}/bbbbbbbb-5555-6666-7777-888888888888/space", "3") + acc_local.handle_message(f"{PREFIX}/bbbbbbbb-5555-6666-7777-888888888888/dipole", "true") snapshot = consumer.build_snapshot() @@ -1201,11 +1278,12 @@ def test_unmapped_tab_properties(self): "core": {"type": TYPE_CORE}, "aaaaaaaa-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, } - consumer = HomieDeviceConsumer(SERIAL, panel_size=4) - consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) - consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) - consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/space", "1") - consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/dipole", "false") + acc_local = HomiePropertyAccumulator(SERIAL) + consumer = HomieDeviceConsumer(acc_local, panel_size=4) + acc_local.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + acc_local.handle_message(f"{PREFIX}/$description", _make_description(nodes)) + acc_local.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/space", "1") + acc_local.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/dipole", "false") snapshot = consumer.build_snapshot() unmapped = snapshot.circuits["unmapped_tab_2"] @@ -1231,9 +1309,10 @@ def test_fully_occupied_panel_no_unmapped(self): "cccccccc-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, "dddddddd-5555-6666-7777-888888888888": {"type": TYPE_CIRCUIT}, } - consumer = HomieDeviceConsumer(SERIAL, panel_size=4) - consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) - consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) + acc_local = HomiePropertyAccumulator(SERIAL) + consumer = HomieDeviceConsumer(acc_local, panel_size=4) + acc_local.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + acc_local.handle_message(f"{PREFIX}/$description", _make_description(nodes)) for i, node in enumerate( [ "aaaaaaaa-1111-2222-3333-444444444444", @@ -1243,8 +1322,8 @@ def test_fully_occupied_panel_no_unmapped(self): ], start=1, ): - consumer.handle_message(f"{PREFIX}/{node}/space", str(i)) - consumer.handle_message(f"{PREFIX}/{node}/dipole", "false") + acc_local.handle_message(f"{PREFIX}/{node}/space", str(i)) + acc_local.handle_message(f"{PREFIX}/{node}/dipole", "false") snapshot = consumer.build_snapshot() unmapped_ids = [cid for cid in snapshot.circuits if cid.startswith("unmapped_tab_")] @@ -1253,9 +1332,10 @@ def test_fully_occupied_panel_no_unmapped(self): def test_no_circuits_all_unmapped(self): """When no circuits exist, all positions up to panel_size are unmapped.""" nodes = {"core": {"type": TYPE_CORE}} - consumer = HomieDeviceConsumer(SERIAL, panel_size=4) - consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) - consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) + acc_local = HomiePropertyAccumulator(SERIAL) + consumer = HomieDeviceConsumer(acc_local, panel_size=4) + acc_local.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + acc_local.handle_message(f"{PREFIX}/$description", _make_description(nodes)) snapshot = consumer.build_snapshot() unmapped_ids = sorted(cid for cid in snapshot.circuits if cid.startswith("unmapped_tab_")) assert unmapped_ids == [ @@ -1271,9 +1351,10 @@ def test_no_space_property_all_unmapped(self): "core": {"type": TYPE_CORE}, "aaaaaaaa-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, } - consumer = HomieDeviceConsumer(SERIAL, panel_size=4) - consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) - consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) + acc_local = HomiePropertyAccumulator(SERIAL) + consumer = HomieDeviceConsumer(acc_local, panel_size=4) + acc_local.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + acc_local.handle_message(f"{PREFIX}/$description", _make_description(nodes)) # Don't set space property — circuit has no tabs snapshot = consumer.build_snapshot() unmapped_ids = sorted(cid for cid in snapshot.circuits if cid.startswith("unmapped_tab_")) @@ -1290,11 +1371,12 @@ def test_unmapped_fills_to_panel_size(self): "core": {"type": TYPE_CORE}, "aaaaaaaa-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, } - consumer = HomieDeviceConsumer(SERIAL, panel_size=8) - consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) - consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) - consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/space", "2") - consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/dipole", "false") + acc_local = HomiePropertyAccumulator(SERIAL) + consumer = HomieDeviceConsumer(acc_local, panel_size=8) + acc_local.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + acc_local.handle_message(f"{PREFIX}/$description", _make_description(nodes)) + acc_local.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/space", "2") + acc_local.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/dipole", "false") snapshot = consumer.build_snapshot() # Occupied: {2}, unmapped: {1,3,4,5,6,7,8} @@ -1315,12 +1397,13 @@ def test_dipole_occupies_correct_tabs_in_unmapped_calc(self): "core": {"type": TYPE_CORE}, "aaaaaaaa-1111-2222-3333-444444444444": {"type": TYPE_CIRCUIT}, } - consumer = HomieDeviceConsumer(SERIAL, panel_size=4) - consumer.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) - consumer.handle_message(f"{PREFIX}/$description", _make_description(nodes)) + acc_local = HomiePropertyAccumulator(SERIAL) + consumer = HomieDeviceConsumer(acc_local, panel_size=4) + acc_local.handle_message(f"{PREFIX}/$state", HOMIE_STATE_READY) + acc_local.handle_message(f"{PREFIX}/$description", _make_description(nodes)) # Dipole at space 1 → occupies 1 and 3 - consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/space", "1") - consumer.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/dipole", "true") + acc_local.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/space", "1") + acc_local.handle_message(f"{PREFIX}/aaaaaaaa-1111-2222-3333-444444444444/dipole", "true") snapshot = consumer.build_snapshot() # panel_size=4, occupied: {1, 3}, unmapped: {2, 4} @@ -1339,22 +1422,22 @@ class TestHomieEVSEMetadata: def test_evse_metadata_parsed(self): """All 9 EVSE properties are extracted into the snapshot.""" circuit_uuid = "aabbccdd-1122-3344-5566-778899001122" - consumer = _build_ready_consumer( + acc, consumer = _build_ready_consumer( { "core": {"type": TYPE_CORE}, circuit_uuid: {"type": TYPE_CIRCUIT}, "evse-0": {"type": TYPE_EVSE}, } ) - consumer.handle_message(f"{PREFIX}/evse-0/feed", circuit_uuid) - consumer.handle_message(f"{PREFIX}/evse-0/status", "CHARGING") - consumer.handle_message(f"{PREFIX}/evse-0/lock-state", "LOCKED") - consumer.handle_message(f"{PREFIX}/evse-0/advertised-current", "32.0") - consumer.handle_message(f"{PREFIX}/evse-0/vendor-name", "SPAN") - consumer.handle_message(f"{PREFIX}/evse-0/product-name", "SPAN Drive") - consumer.handle_message(f"{PREFIX}/evse-0/part-number", "SPN-DRV-001") - consumer.handle_message(f"{PREFIX}/evse-0/serial-number", "SN12345") - consumer.handle_message(f"{PREFIX}/evse-0/software-version", "2.1.0") + acc.handle_message(f"{PREFIX}/evse-0/feed", circuit_uuid) + acc.handle_message(f"{PREFIX}/evse-0/status", "CHARGING") + acc.handle_message(f"{PREFIX}/evse-0/lock-state", "LOCKED") + acc.handle_message(f"{PREFIX}/evse-0/advertised-current", "32.0") + acc.handle_message(f"{PREFIX}/evse-0/vendor-name", "SPAN") + acc.handle_message(f"{PREFIX}/evse-0/product-name", "SPAN Drive") + acc.handle_message(f"{PREFIX}/evse-0/part-number", "SPN-DRV-001") + acc.handle_message(f"{PREFIX}/evse-0/serial-number", "SN12345") + acc.handle_message(f"{PREFIX}/evse-0/software-version", "2.1.0") snapshot = consumer.build_snapshot() assert "evse-0" in snapshot.evse @@ -1374,7 +1457,7 @@ def test_evse_multiple_devices(self): """Two EVSE nodes produce two snapshot entries.""" circ_a = "aaaaaaaa-1111-2222-3333-444444444444" circ_b = "bbbbbbbb-1111-2222-3333-444444444444" - consumer = _build_ready_consumer( + acc, consumer = _build_ready_consumer( { "core": {"type": TYPE_CORE}, circ_a: {"type": TYPE_CIRCUIT}, @@ -1383,10 +1466,10 @@ def test_evse_multiple_devices(self): "evse-1": {"type": TYPE_EVSE}, } ) - consumer.handle_message(f"{PREFIX}/evse-0/feed", circ_a) - consumer.handle_message(f"{PREFIX}/evse-0/status", "CHARGING") - consumer.handle_message(f"{PREFIX}/evse-1/feed", circ_b) - consumer.handle_message(f"{PREFIX}/evse-1/status", "AVAILABLE") + acc.handle_message(f"{PREFIX}/evse-0/feed", circ_a) + acc.handle_message(f"{PREFIX}/evse-0/status", "CHARGING") + acc.handle_message(f"{PREFIX}/evse-1/feed", circ_b) + acc.handle_message(f"{PREFIX}/evse-1/status", "AVAILABLE") snapshot = consumer.build_snapshot() assert len(snapshot.evse) == 2 @@ -1395,22 +1478,22 @@ def test_evse_multiple_devices(self): def test_evse_no_node(self): """Empty dict when no EVSE commissioned.""" - consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}}) + acc, consumer = _build_ready_consumer({"core": {"type": TYPE_CORE}}) snapshot = consumer.build_snapshot() assert snapshot.evse == {} def test_evse_partial_metadata(self): """Missing optional fields are None.""" circuit_uuid = "aabbccdd-1122-3344-5566-778899001122" - consumer = _build_ready_consumer( + acc, consumer = _build_ready_consumer( { "core": {"type": TYPE_CORE}, circuit_uuid: {"type": TYPE_CIRCUIT}, "evse-0": {"type": TYPE_EVSE}, } ) - consumer.handle_message(f"{PREFIX}/evse-0/feed", circuit_uuid) - consumer.handle_message(f"{PREFIX}/evse-0/status", "AVAILABLE") + acc.handle_message(f"{PREFIX}/evse-0/feed", circuit_uuid) + acc.handle_message(f"{PREFIX}/evse-0/status", "AVAILABLE") snapshot = consumer.build_snapshot() evse = snapshot.evse["evse-0"] @@ -1426,18 +1509,18 @@ def test_evse_partial_metadata(self): def test_evse_feed_still_annotates_circuit(self): """Existing feed annotation still works alongside new EVSE snapshot.""" circuit_uuid = "aabbccdd-1122-3344-5566-778899001122" - consumer = _build_ready_consumer( + acc, consumer = _build_ready_consumer( { "core": {"type": TYPE_CORE}, circuit_uuid: {"type": TYPE_CIRCUIT}, "evse-0": {"type": TYPE_EVSE}, } ) - consumer.handle_message(f"{PREFIX}/evse-0/feed", circuit_uuid) - consumer.handle_message(f"{PREFIX}/evse-0/relative-position", "IN_PANEL") - consumer.handle_message(f"{PREFIX}/evse-0/status", "CHARGING") - consumer.handle_message(f"{PREFIX}/{circuit_uuid}/name", "EV Charger") - consumer.handle_message(f"{PREFIX}/{circuit_uuid}/space", "10") + acc.handle_message(f"{PREFIX}/evse-0/feed", circuit_uuid) + acc.handle_message(f"{PREFIX}/evse-0/relative-position", "IN_PANEL") + acc.handle_message(f"{PREFIX}/evse-0/status", "CHARGING") + acc.handle_message(f"{PREFIX}/{circuit_uuid}/name", "EV Charger") + acc.handle_message(f"{PREFIX}/{circuit_uuid}/space", "10") snapshot = consumer.build_snapshot() # Circuit should be annotated with device_type=evse diff --git a/uv.lock b/uv.lock index bdcbf06..7abcd7b 100644 --- a/uv.lock +++ b/uv.lock @@ -1310,7 +1310,7 @@ wheels = [ [[package]] name = "span-panel-api" -version = "2.4.2" +version = "2.5.0" source = { editable = "." } dependencies = [ { name = "httpx" },