diff --git a/tests/actor/test_resampling.py b/tests/actor/test_resampling.py index 9b15339de..f5ea72ade 100644 --- a/tests/actor/test_resampling.py +++ b/tests/actor/test_resampling.py @@ -184,3 +184,48 @@ async def test_duplicate_request( ) await resampling_actor._resampler.stop() # pylint: disable=protected-access + + +async def test_stalled_request_does_not_block_later_subscriptions() -> None: + """Ensure resampling does not head-of-line block.""" + channel_registry = ChannelRegistry(name="test") + data_source_req_chan = Broadcast[ComponentMetricRequest](name="data-source-req") + data_source_req_recv = data_source_req_chan.new_receiver() + resampling_req_chan = Broadcast[ComponentMetricRequest](name="resample-req") + resampling_req_sender = resampling_req_chan.new_sender() + + async with ComponentMetricsResamplingActor( + channel_registry=channel_registry, + data_sourcing_request_sender=data_source_req_chan.new_sender(), + resampling_request_receiver=resampling_req_chan.new_receiver(), + config=ResamplerConfig2( + resampling_period=timedelta(seconds=0.2), + max_data_age_in_periods=2, + ), + ): + first_request = ComponentMetricRequest( + namespace="Resampling-A", + component_id=ComponentId(9), + metric=Metric.BATTERY_SOC_PCT, + start_time=None, + ) + second_request = ComponentMetricRequest( + namespace="Resampling-B", + component_id=ComponentId(10), + metric=Metric.AC_ACTIVE_POWER, + start_time=None, + ) + + await resampling_req_sender.send(first_request) + stalled_data_source_request = await data_source_req_recv.receive() + assert stalled_data_source_request == dataclasses.replace( + first_request, namespace="Resampling-A:Source" + ) + + await resampling_req_sender.send(second_request) + later_data_source_request = await asyncio.wait_for( + data_source_req_recv.receive(), timeout=0.1 + ) + assert later_data_source_request == dataclasses.replace( + second_request, namespace="Resampling-B:Source" + ) diff --git a/tests/microgrid/test_data_sourcing.py b/tests/microgrid/test_data_sourcing.py index bcc4e5857..310603ba7 100644 --- a/tests/microgrid/test_data_sourcing.py +++ b/tests/microgrid/test_data_sourcing.py @@ -38,6 +38,8 @@ ) from frequenz.sdk.timeseries import Sample +from ..utils.receive_timeout import Timeout, receive_timeout + T = TypeVar("T", bound=ComponentData) _MICROGRID_ID = MicrogridId(1) @@ -168,6 +170,109 @@ async def test_data_sourcing_actor( # pylint: disable=too-many-locals assert -13.0 + i == sample.value.base_value +async def test_duplicate_requests_do_not_block_new_receivers( + mock_connection_manager: mock.Mock, # pylint: disable=redefined-outer-name,unused-argument +) -> None: + """Ensure duplicate direct subscriptions still deliver samples.""" + req_chan = Broadcast[ComponentMetricRequest](name="data_sourcing_requests") + req_sender = req_chan.new_sender() + registry = ChannelRegistry(name="test-registry") + + request = ComponentMetricRequest( + "test-namespace", + ComponentId(4), + Metric.AC_ACTIVE_POWER, + None, + ) + + async with DataSourcingActor(req_chan.new_receiver(), registry): + first_receiver = registry.get_or_create( + Sample[Quantity], request.get_channel_name() + ).new_receiver() + await req_sender.send(request) + + first_sample = await receive_timeout(first_receiver, timeout=1.0) + assert first_sample is not Timeout + + second_receiver = registry.get_or_create( + Sample[Quantity], request.get_channel_name() + ).new_receiver() + await req_sender.send(request) + + second_sample = await receive_timeout(second_receiver, timeout=1.0) + assert second_sample is not Timeout + + +async def test_unknown_component_request_does_not_block_later_valid_request( + mock_connection_manager: mock.Mock, # pylint: disable=redefined-outer-name,unused-argument +) -> None: + """Ensure unknown component requests don't block later valid streams.""" + req_chan = Broadcast[ComponentMetricRequest](name="data_sourcing_requests") + req_sender = req_chan.new_sender() + registry = ChannelRegistry(name="test-registry") + + unknown_request = ComponentMetricRequest( + "unknown-component", + ComponentId(999), + Metric.AC_ACTIVE_POWER, + None, + ) + valid_request = ComponentMetricRequest( + "valid-component", + ComponentId(4), + Metric.AC_ACTIVE_POWER, + None, + ) + + async with DataSourcingActor(req_chan.new_receiver(), registry): + valid_receiver = registry.get_or_create( + Sample[Quantity], valid_request.get_channel_name() + ).new_receiver() + + await req_sender.send(unknown_request) + await req_sender.send(valid_request) + + valid_sample = await receive_timeout(valid_receiver, timeout=1.0) + assert valid_sample is not Timeout + + +@pytest.mark.skip( + reason="This is currently failing, but we are probably not going to " + "fix it since the data sourcing actor will likely go away." +) +async def test_invalid_metric_request_does_not_block_later_valid_request( + mock_connection_manager: mock.Mock, # pylint: disable=redefined-outer-name,unused-argument +) -> None: + """Ensure a bad request doesn't poison later valid subscriptions.""" + req_chan = Broadcast[ComponentMetricRequest](name="data_sourcing_requests") + req_sender = req_chan.new_sender() + registry = ChannelRegistry(name="test-registry") + + invalid_request = ComponentMetricRequest( + "invalid-metric", + ComponentId(4), + Metric.BATTERY_SOC_PCT, + None, + ) + valid_request = ComponentMetricRequest( + "valid-metric", + ComponentId(4), + Metric.AC_ACTIVE_POWER, + None, + ) + + async with DataSourcingActor(req_chan.new_receiver(), registry): + valid_receiver = registry.get_or_create( + Sample[Quantity], valid_request.get_channel_name() + ).new_receiver() + + await req_sender.send(invalid_request) + await req_sender.send(valid_request) + + valid_sample = await receive_timeout(valid_receiver, timeout=1.0) + assert valid_sample is not Timeout + + def _new_meter_data( component_id: ComponentId, timestamp: datetime, value: float ) -> MeterData: diff --git a/tests/timeseries/_battery_pool/test_battery_pool.py b/tests/timeseries/_battery_pool/test_battery_pool.py index 783fedb6c..489cb6746 100644 --- a/tests/timeseries/_battery_pool/test_battery_pool.py +++ b/tests/timeseries/_battery_pool/test_battery_pool.py @@ -15,17 +15,19 @@ from dataclasses import dataclass, is_dataclass, replace from datetime import datetime, timedelta, timezone from typing import Any, Generic, TypeVar +from unittest.mock import MagicMock import async_solipsism import pytest import time_machine -from frequenz.channels import Receiver, Sender +from frequenz.channels import Broadcast, Receiver, Sender from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import Battery, Component from frequenz.quantities import Energy, Percentage, Power, Temperature from pytest_mock import MockerFixture -from frequenz.sdk import microgrid +from frequenz.sdk import microgrid, timeseries +from frequenz.sdk._internal._channels import ChannelRegistry from frequenz.sdk._internal._constants import ( MAX_BATTERY_DATA_AGE_SEC, WAIT_FOR_COMPONENT_DATA_SEC, @@ -34,9 +36,13 @@ from frequenz.sdk.microgrid._power_distributing._component_managers._battery_manager import ( _get_battery_inverter_mappings, ) +from frequenz.sdk.microgrid._power_managing import ReportRequest, _Report from frequenz.sdk.timeseries import Bounds, ResamplerConfig2, Sample from frequenz.sdk.timeseries._base_types import SystemBounds from frequenz.sdk.timeseries.battery_pool import BatteryPool +from frequenz.sdk.timeseries.battery_pool._battery_pool_reference_store import ( + BatteryPoolReferenceStore, +) from ...timeseries.mock_microgrid import MockMicrogrid from ...utils.component_data_streamer import MockComponentDataStreamer @@ -50,6 +56,16 @@ _logger = logging.getLogger(__name__) +def _new_power_status_report(target_power_watts: float) -> _Report: + """Create a distinct report for power status assertions.""" + target_power = Power.from_watts(target_power_watts) + return _Report( + target_power=target_power, + _inclusion_bounds=timeseries.Bounds(target_power, target_power), + _exclusion_bounds=None, + ) + + @pytest.fixture(autouse=True) def event_loop_policy() -> async_solipsism.EventLoopPolicy: """Return an event loop policy that uses the async solipsism event loop.""" @@ -1200,3 +1216,58 @@ async def run_temperature_test( # pylint: disable=too-many-locals streamer.start_streaming(latest_data, sampling_rate=0.1) msg = await asyncio.wait_for(receiver.receive(), timeout=waiting_time_sec) compare_messages(msg, Sample(now, Temperature.from_celsius(15.0))) + + +async def test_power_status_same_instance_subscriptions_work( + mocker: MockerFixture, +) -> None: + """Ensure same-instance power_status subscriptions share the same channel.""" + mock_cm = MagicMock() + mock_graph = MagicMock() + mock_graph.components.return_value = [ + MagicMock(id=ComponentId(8)), + MagicMock(id=ComponentId(18)), + ] + mock_cm.component_graph = mock_graph + mocker.patch( + "frequenz.sdk.microgrid.connection_manager._CONNECTION_MANAGER", + mock_cm, + ) + mocker.patch("frequenz.sdk.microgrid.connection_manager.get", return_value=mock_cm) + + registry = ChannelRegistry(name="battery-pool-test") + requests_channel = Broadcast[ReportRequest](name="battery-pool-requests") + requests_rx = requests_channel.new_receiver() + component_ids = frozenset({ComponentId(8), ComponentId(18)}) + pool = BatteryPool( + pool_ref_store=BatteryPoolReferenceStore( + channel_registry=registry, + resampler_subscription_sender=MagicMock(), + batteries_status_receiver=MagicMock(), + power_manager_requests_sender=MagicMock(), + power_manager_bounds_subscription_sender=requests_channel.new_sender(), + power_distribution_results_fetcher=MagicMock(), + min_update_interval=timedelta(seconds=1), + batteries_id=component_ids, + ), + name="battery-pool", + priority=5, + ) + + first_status_rx = pool.power_status.new_receiver() + second_status_rx = pool.power_status.new_receiver() + + await asyncio.sleep(0) + + first_request = await asyncio.wait_for(requests_rx.receive(), timeout=1.0) + second_request = await asyncio.wait_for(requests_rx.receive(), timeout=1.0) + assert second_request.get_channel_name() == first_request.get_channel_name() + + await registry.get_or_create( + _Report, first_request.get_channel_name() + ).new_sender().send(_new_power_status_report(123.0)) + + first_report = await asyncio.wait_for(first_status_rx.receive(), timeout=1.0) + second_report = await asyncio.wait_for(second_status_rx.receive(), timeout=1.0) + assert first_report.target_power == Power.from_watts(123.0) + assert second_report.target_power == Power.from_watts(123.0) diff --git a/tests/timeseries/_ev_charger_pool/test_ev_charger_pool.py b/tests/timeseries/_ev_charger_pool/test_ev_charger_pool.py index 113908923..4a7c1ed71 100644 --- a/tests/timeseries/_ev_charger_pool/test_ev_charger_pool.py +++ b/tests/timeseries/_ev_charger_pool/test_ev_charger_pool.py @@ -4,13 +4,34 @@ """Tests for the `EVChargerPool`.""" +import asyncio +from unittest.mock import MagicMock + +from frequenz.channels import Broadcast +from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Power from pytest_mock import MockerFixture -from frequenz.sdk import microgrid +from frequenz.sdk import microgrid, timeseries +from frequenz.sdk._internal._channels import ChannelRegistry +from frequenz.sdk.microgrid._power_managing import ReportRequest, _Report +from frequenz.sdk.timeseries.ev_charger_pool import EVChargerPool +from frequenz.sdk.timeseries.ev_charger_pool._ev_charger_pool_reference_store import ( + EVChargerPoolReferenceStore, +) from tests.timeseries.mock_microgrid import MockMicrogrid +def _new_power_status_report(target_power_watts: float) -> _Report: + """Create a distinct report for power status assertions.""" + target_power = Power.from_watts(target_power_watts) + return _Report( + target_power=target_power, + _inclusion_bounds=timeseries.Bounds(target_power, target_power), + _exclusion_bounds=None, + ) + + class TestEVChargerPool: """Tests for the `EVChargerPool`.""" @@ -28,3 +49,57 @@ async def test_ev_power( # pylint: disable=too-many-locals await mockgrid.mock_resampler.send_meter_power([16.0]) assert (await power_receiver.receive()).value == Power.from_watts(16.0) + + +async def test_power_status_same_instance_subscriptions_work( + mocker: MockerFixture, +) -> None: + """Ensure same-instance power_status subscriptions share the same channel.""" + mock_cm = MagicMock() + mock_graph = MagicMock() + mock_graph.components.return_value = [ + MagicMock(id=ComponentId(12)), + MagicMock(id=ComponentId(22)), + ] + mock_cm.component_graph = mock_graph + mocker.patch( + "frequenz.sdk.microgrid.connection_manager._CONNECTION_MANAGER", + mock_cm, + ) + mocker.patch("frequenz.sdk.microgrid.connection_manager.get", return_value=mock_cm) + + registry = ChannelRegistry(name="ev-pool-test") + requests_channel = Broadcast[ReportRequest](name="ev-pool-requests") + requests_rx = requests_channel.new_receiver() + component_ids = frozenset({ComponentId(12), ComponentId(22)}) + pool = EVChargerPool( + pool_ref_store=EVChargerPoolReferenceStore( + channel_registry=registry, + resampler_subscription_sender=MagicMock(), + status_receiver=MagicMock(), + power_manager_requests_sender=MagicMock(), + power_manager_bounds_subs_sender=requests_channel.new_sender(), + power_distribution_results_fetcher=MagicMock(), + component_ids=component_ids, + ), + name="ev-pool", + priority=5, + ) + + first_status_rx = pool.power_status.new_receiver() + second_status_rx = pool.power_status.new_receiver() + + await asyncio.sleep(0) + + first_request = await asyncio.wait_for(requests_rx.receive(), timeout=1.0) + second_request = await asyncio.wait_for(requests_rx.receive(), timeout=1.0) + assert second_request.get_channel_name() == first_request.get_channel_name() + + await registry.get_or_create( + _Report, first_request.get_channel_name() + ).new_sender().send(_new_power_status_report(123.0)) + + first_report = await asyncio.wait_for(first_status_rx.receive(), timeout=1.0) + second_report = await asyncio.wait_for(second_status_rx.receive(), timeout=1.0) + assert first_report.target_power == Power.from_watts(123.0) + assert second_report.target_power == Power.from_watts(123.0) diff --git a/tests/timeseries/_pv_pool/test_pv_pool.py b/tests/timeseries/_pv_pool/test_pv_pool.py new file mode 100644 index 000000000..51dcae8bd --- /dev/null +++ b/tests/timeseries/_pv_pool/test_pv_pool.py @@ -0,0 +1,84 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Tests for the `PVPool`.""" + +import asyncio +from unittest.mock import MagicMock + +from frequenz.channels import Broadcast +from frequenz.client.common.microgrid.components import ComponentId +from frequenz.quantities import Power +from pytest_mock import MockerFixture + +from frequenz.sdk import timeseries +from frequenz.sdk._internal._channels import ChannelRegistry +from frequenz.sdk.microgrid._power_managing import ReportRequest, _Report +from frequenz.sdk.timeseries.pv_pool import PVPool +from frequenz.sdk.timeseries.pv_pool._pv_pool_reference_store import ( + PVPoolReferenceStore, +) + + +def _new_power_status_report(target_power_watts: float) -> _Report: + """Create a distinct report for power status assertions.""" + target_power = Power.from_watts(target_power_watts) + return _Report( + target_power=target_power, + _inclusion_bounds=timeseries.Bounds(target_power, target_power), + _exclusion_bounds=None, + ) + + +async def test_power_status_same_instance_subscriptions_work( + mocker: MockerFixture, +) -> None: + """Ensure same-instance power_status subscriptions share the same channel.""" + mock_cm = MagicMock() + mock_graph = MagicMock() + mock_graph.components.return_value = [ + MagicMock(id=ComponentId(28)), + MagicMock(id=ComponentId(38)), + ] + mock_cm.component_graph = mock_graph + mocker.patch( + "frequenz.sdk.microgrid.connection_manager._CONNECTION_MANAGER", + mock_cm, + ) + mocker.patch("frequenz.sdk.microgrid.connection_manager.get", return_value=mock_cm) + + registry = ChannelRegistry(name="pv-pool-test") + requests_channel = Broadcast[ReportRequest](name="pv-pool-requests") + requests_rx = requests_channel.new_receiver() + component_ids = frozenset({ComponentId(28), ComponentId(38)}) + pool = PVPool( + pool_ref_store=PVPoolReferenceStore( + channel_registry=registry, + resampler_subscription_sender=MagicMock(), + status_receiver=MagicMock(), + power_manager_requests_sender=MagicMock(), + power_manager_bounds_subs_sender=requests_channel.new_sender(), + power_distribution_results_fetcher=MagicMock(), + component_ids=component_ids, + ), + name="pv-pool", + priority=5, + ) + + first_status_rx = pool.power_status.new_receiver() + second_status_rx = pool.power_status.new_receiver() + + await asyncio.sleep(0) + + first_request = await asyncio.wait_for(requests_rx.receive(), timeout=1.0) + second_request = await asyncio.wait_for(requests_rx.receive(), timeout=1.0) + assert second_request.get_channel_name() == first_request.get_channel_name() + + await registry.get_or_create( + _Report, first_request.get_channel_name() + ).new_sender().send(_new_power_status_report(123.0)) + + first_report = await asyncio.wait_for(first_status_rx.receive(), timeout=1.0) + second_report = await asyncio.wait_for(second_status_rx.receive(), timeout=1.0) + assert first_report.target_power == Power.from_watts(123.0) + assert second_report.target_power == Power.from_watts(123.0)