Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions tests/actor/test_resampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,48 @@ async def test_duplicate_request(
)

await resampling_actor._resampler.stop() # pylint: disable=protected-access


async def test_stalled_request_does_not_block_later_subscriptions() -> None:
"""Ensure resampling does not head-of-line block."""
channel_registry = ChannelRegistry(name="test")
data_source_req_chan = Broadcast[ComponentMetricRequest](name="data-source-req")
data_source_req_recv = data_source_req_chan.new_receiver()
resampling_req_chan = Broadcast[ComponentMetricRequest](name="resample-req")
resampling_req_sender = resampling_req_chan.new_sender()

async with ComponentMetricsResamplingActor(
channel_registry=channel_registry,
data_sourcing_request_sender=data_source_req_chan.new_sender(),
resampling_request_receiver=resampling_req_chan.new_receiver(),
config=ResamplerConfig2(
resampling_period=timedelta(seconds=0.2),
max_data_age_in_periods=2,
),
):
first_request = ComponentMetricRequest(
namespace="Resampling-A",
component_id=ComponentId(9),
metric=Metric.BATTERY_SOC_PCT,
start_time=None,
)
second_request = ComponentMetricRequest(
namespace="Resampling-B",
component_id=ComponentId(10),
metric=Metric.AC_ACTIVE_POWER,
start_time=None,
)

await resampling_req_sender.send(first_request)
stalled_data_source_request = await data_source_req_recv.receive()
assert stalled_data_source_request == dataclasses.replace(
first_request, namespace="Resampling-A:Source"
)

await resampling_req_sender.send(second_request)
later_data_source_request = await asyncio.wait_for(
data_source_req_recv.receive(), timeout=0.1
)
assert later_data_source_request == dataclasses.replace(
second_request, namespace="Resampling-B:Source"
)
105 changes: 105 additions & 0 deletions tests/microgrid/test_data_sourcing.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
)
from frequenz.sdk.timeseries import Sample

from ..utils.receive_timeout import Timeout, receive_timeout

T = TypeVar("T", bound=ComponentData)

_MICROGRID_ID = MicrogridId(1)
Expand Down Expand Up @@ -168,6 +170,109 @@ async def test_data_sourcing_actor( # pylint: disable=too-many-locals
assert -13.0 + i == sample.value.base_value


async def test_duplicate_requests_do_not_block_new_receivers(
mock_connection_manager: mock.Mock, # pylint: disable=redefined-outer-name,unused-argument
) -> None:
"""Ensure duplicate direct subscriptions still deliver samples."""
req_chan = Broadcast[ComponentMetricRequest](name="data_sourcing_requests")
req_sender = req_chan.new_sender()
registry = ChannelRegistry(name="test-registry")

request = ComponentMetricRequest(
"test-namespace",
ComponentId(4),
Metric.AC_ACTIVE_POWER,
None,
)

async with DataSourcingActor(req_chan.new_receiver(), registry):
first_receiver = registry.get_or_create(
Sample[Quantity], request.get_channel_name()
).new_receiver()
await req_sender.send(request)

first_sample = await receive_timeout(first_receiver, timeout=1.0)
assert first_sample is not Timeout

second_receiver = registry.get_or_create(
Sample[Quantity], request.get_channel_name()
).new_receiver()
await req_sender.send(request)

second_sample = await receive_timeout(second_receiver, timeout=1.0)
assert second_sample is not Timeout


async def test_unknown_component_request_does_not_block_later_valid_request(
mock_connection_manager: mock.Mock, # pylint: disable=redefined-outer-name,unused-argument
) -> None:
"""Ensure unknown component requests don't block later valid streams."""
req_chan = Broadcast[ComponentMetricRequest](name="data_sourcing_requests")
req_sender = req_chan.new_sender()
registry = ChannelRegistry(name="test-registry")

unknown_request = ComponentMetricRequest(
"unknown-component",
ComponentId(999),
Metric.AC_ACTIVE_POWER,
None,
)
valid_request = ComponentMetricRequest(
"valid-component",
ComponentId(4),
Metric.AC_ACTIVE_POWER,
None,
)

async with DataSourcingActor(req_chan.new_receiver(), registry):
valid_receiver = registry.get_or_create(
Sample[Quantity], valid_request.get_channel_name()
).new_receiver()

await req_sender.send(unknown_request)
await req_sender.send(valid_request)

valid_sample = await receive_timeout(valid_receiver, timeout=1.0)
assert valid_sample is not Timeout


@pytest.mark.skip(
reason="This is currently failing, but we are probably not going to "
"fix it since the data sourcing actor will likely go away."
)
async def test_invalid_metric_request_does_not_block_later_valid_request(
mock_connection_manager: mock.Mock, # pylint: disable=redefined-outer-name,unused-argument
) -> None:
"""Ensure a bad request doesn't poison later valid subscriptions."""
req_chan = Broadcast[ComponentMetricRequest](name="data_sourcing_requests")
req_sender = req_chan.new_sender()
registry = ChannelRegistry(name="test-registry")

invalid_request = ComponentMetricRequest(
"invalid-metric",
ComponentId(4),
Metric.BATTERY_SOC_PCT,
None,
)
valid_request = ComponentMetricRequest(
"valid-metric",
ComponentId(4),
Metric.AC_ACTIVE_POWER,
None,
)

async with DataSourcingActor(req_chan.new_receiver(), registry):
valid_receiver = registry.get_or_create(
Sample[Quantity], valid_request.get_channel_name()
).new_receiver()

await req_sender.send(invalid_request)
await req_sender.send(valid_request)

valid_sample = await receive_timeout(valid_receiver, timeout=1.0)
assert valid_sample is not Timeout


def _new_meter_data(
component_id: ComponentId, timestamp: datetime, value: float
) -> MeterData:
Expand Down
75 changes: 73 additions & 2 deletions tests/timeseries/_battery_pool/test_battery_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@
from dataclasses import dataclass, is_dataclass, replace
from datetime import datetime, timedelta, timezone
from typing import Any, Generic, TypeVar
from unittest.mock import MagicMock

import async_solipsism
import pytest
import time_machine
from frequenz.channels import Receiver, Sender
from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.client.common.microgrid.components import ComponentId
from frequenz.client.microgrid.component import Battery, Component
from frequenz.quantities import Energy, Percentage, Power, Temperature
from pytest_mock import MockerFixture

from frequenz.sdk import microgrid
from frequenz.sdk import microgrid, timeseries
from frequenz.sdk._internal._channels import ChannelRegistry
from frequenz.sdk._internal._constants import (
MAX_BATTERY_DATA_AGE_SEC,
WAIT_FOR_COMPONENT_DATA_SEC,
Expand All @@ -34,9 +36,13 @@
from frequenz.sdk.microgrid._power_distributing._component_managers._battery_manager import (
_get_battery_inverter_mappings,
)
from frequenz.sdk.microgrid._power_managing import ReportRequest, _Report
from frequenz.sdk.timeseries import Bounds, ResamplerConfig2, Sample
from frequenz.sdk.timeseries._base_types import SystemBounds
from frequenz.sdk.timeseries.battery_pool import BatteryPool
from frequenz.sdk.timeseries.battery_pool._battery_pool_reference_store import (
BatteryPoolReferenceStore,
)

from ...timeseries.mock_microgrid import MockMicrogrid
from ...utils.component_data_streamer import MockComponentDataStreamer
Expand All @@ -50,6 +56,16 @@
_logger = logging.getLogger(__name__)


def _new_power_status_report(target_power_watts: float) -> _Report:
"""Create a distinct report for power status assertions."""
target_power = Power.from_watts(target_power_watts)
return _Report(
target_power=target_power,
_inclusion_bounds=timeseries.Bounds(target_power, target_power),
_exclusion_bounds=None,
)


@pytest.fixture(autouse=True)
def event_loop_policy() -> async_solipsism.EventLoopPolicy:
"""Return an event loop policy that uses the async solipsism event loop."""
Expand Down Expand Up @@ -1200,3 +1216,58 @@ async def run_temperature_test( # pylint: disable=too-many-locals
streamer.start_streaming(latest_data, sampling_rate=0.1)
msg = await asyncio.wait_for(receiver.receive(), timeout=waiting_time_sec)
compare_messages(msg, Sample(now, Temperature.from_celsius(15.0)))


async def test_power_status_same_instance_subscriptions_work(
mocker: MockerFixture,
) -> None:
"""Ensure same-instance power_status subscriptions share the same channel."""
mock_cm = MagicMock()
mock_graph = MagicMock()
mock_graph.components.return_value = [
MagicMock(id=ComponentId(8)),
MagicMock(id=ComponentId(18)),
]
mock_cm.component_graph = mock_graph
mocker.patch(
"frequenz.sdk.microgrid.connection_manager._CONNECTION_MANAGER",
mock_cm,
)
mocker.patch("frequenz.sdk.microgrid.connection_manager.get", return_value=mock_cm)

registry = ChannelRegistry(name="battery-pool-test")
requests_channel = Broadcast[ReportRequest](name="battery-pool-requests")
requests_rx = requests_channel.new_receiver()
component_ids = frozenset({ComponentId(8), ComponentId(18)})
pool = BatteryPool(
pool_ref_store=BatteryPoolReferenceStore(
channel_registry=registry,
resampler_subscription_sender=MagicMock(),
batteries_status_receiver=MagicMock(),
power_manager_requests_sender=MagicMock(),
power_manager_bounds_subscription_sender=requests_channel.new_sender(),
power_distribution_results_fetcher=MagicMock(),
min_update_interval=timedelta(seconds=1),
batteries_id=component_ids,
),
name="battery-pool",
priority=5,
)

first_status_rx = pool.power_status.new_receiver()
second_status_rx = pool.power_status.new_receiver()

await asyncio.sleep(0)

first_request = await asyncio.wait_for(requests_rx.receive(), timeout=1.0)
second_request = await asyncio.wait_for(requests_rx.receive(), timeout=1.0)
assert second_request.get_channel_name() == first_request.get_channel_name()

await registry.get_or_create(
_Report, first_request.get_channel_name()
).new_sender().send(_new_power_status_report(123.0))

first_report = await asyncio.wait_for(first_status_rx.receive(), timeout=1.0)
second_report = await asyncio.wait_for(second_status_rx.receive(), timeout=1.0)
assert first_report.target_power == Power.from_watts(123.0)
assert second_report.target_power == Power.from_watts(123.0)
77 changes: 76 additions & 1 deletion tests/timeseries/_ev_charger_pool/test_ev_charger_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,34 @@
"""Tests for the `EVChargerPool`."""


import asyncio
from unittest.mock import MagicMock

from frequenz.channels import Broadcast
from frequenz.client.common.microgrid.components import ComponentId
from frequenz.quantities import Power
from pytest_mock import MockerFixture

from frequenz.sdk import microgrid
from frequenz.sdk import microgrid, timeseries
from frequenz.sdk._internal._channels import ChannelRegistry
from frequenz.sdk.microgrid._power_managing import ReportRequest, _Report
from frequenz.sdk.timeseries.ev_charger_pool import EVChargerPool
from frequenz.sdk.timeseries.ev_charger_pool._ev_charger_pool_reference_store import (
EVChargerPoolReferenceStore,
)
from tests.timeseries.mock_microgrid import MockMicrogrid


def _new_power_status_report(target_power_watts: float) -> _Report:
"""Create a distinct report for power status assertions."""
target_power = Power.from_watts(target_power_watts)
return _Report(
target_power=target_power,
_inclusion_bounds=timeseries.Bounds(target_power, target_power),
_exclusion_bounds=None,
)


class TestEVChargerPool:
"""Tests for the `EVChargerPool`."""

Expand All @@ -28,3 +49,57 @@ async def test_ev_power( # pylint: disable=too-many-locals

await mockgrid.mock_resampler.send_meter_power([16.0])
assert (await power_receiver.receive()).value == Power.from_watts(16.0)


async def test_power_status_same_instance_subscriptions_work(
mocker: MockerFixture,
) -> None:
"""Ensure same-instance power_status subscriptions share the same channel."""
mock_cm = MagicMock()
mock_graph = MagicMock()
mock_graph.components.return_value = [
MagicMock(id=ComponentId(12)),
MagicMock(id=ComponentId(22)),
]
mock_cm.component_graph = mock_graph
mocker.patch(
"frequenz.sdk.microgrid.connection_manager._CONNECTION_MANAGER",
mock_cm,
)
mocker.patch("frequenz.sdk.microgrid.connection_manager.get", return_value=mock_cm)

registry = ChannelRegistry(name="ev-pool-test")
requests_channel = Broadcast[ReportRequest](name="ev-pool-requests")
requests_rx = requests_channel.new_receiver()
component_ids = frozenset({ComponentId(12), ComponentId(22)})
pool = EVChargerPool(
pool_ref_store=EVChargerPoolReferenceStore(
channel_registry=registry,
resampler_subscription_sender=MagicMock(),
status_receiver=MagicMock(),
power_manager_requests_sender=MagicMock(),
power_manager_bounds_subs_sender=requests_channel.new_sender(),
power_distribution_results_fetcher=MagicMock(),
component_ids=component_ids,
),
name="ev-pool",
priority=5,
)

first_status_rx = pool.power_status.new_receiver()
second_status_rx = pool.power_status.new_receiver()

await asyncio.sleep(0)

first_request = await asyncio.wait_for(requests_rx.receive(), timeout=1.0)
second_request = await asyncio.wait_for(requests_rx.receive(), timeout=1.0)
assert second_request.get_channel_name() == first_request.get_channel_name()

await registry.get_or_create(
_Report, first_request.get_channel_name()
).new_sender().send(_new_power_status_report(123.0))

first_report = await asyncio.wait_for(first_status_rx.receive(), timeout=1.0)
second_report = await asyncio.wait_for(second_status_rx.receive(), timeout=1.0)
assert first_report.target_power == Power.from_watts(123.0)
assert second_report.target_power == Power.from_watts(123.0)
Loading
Loading