Skip to content

Remove channel registry#1371

Open
simonvoelcker wants to merge 26 commits intofrequenz-floss:v1.x.xfrom
simonvoelcker:remove-channel-registry
Open

Remove channel registry#1371
simonvoelcker wants to merge 26 commits intofrequenz-floss:v1.x.xfrom
simonvoelcker:remove-channel-registry

Conversation

@simonvoelcker
Copy link
Copy Markdown
Contributor

@simonvoelcker simonvoelcker commented Mar 5, 2026

This removes the ChannelRegistry class.

Background

This is part of an initiative to close unused channels to save resources. The ChannelRegistry was a de-facto singleton class for storing channels by name. Request objects such as ComponentMetricRequest or ReportRequest have a get_channel_name() method, and the returned channel name acted as a contract between sender and receiver - With the name, they could obtain the same channel from the ChannelRegistry. By getting rid of the registry, we are moving towards a target state where only references to senders and receivers, but no channels are being held.

Implementation

ComponentMetricRequest

ComponentMetricRequest objects represent a request to receive metrics from a component - Sometimes also referred to as telemetry stream. To serve such a request without relying on the ChannelRegistry, the request got a new attribute: telem_stream_sender: Sender[Receiver[Sample[Quantity]]]. This is the sender of a one-shot channel that is used to send the receiver of the telemetry stream back to the original requester - Similar to a callback function. The receiver of a component metric request should therefore create (and own) the channel that sends the metric data, and use the one-shot channel to send a receiver of that channel "back".

In a few places, component metric requests were created and sent from synchronous functions which returned the receiver. To make this work with the round-trip outlined above, an additional channel was added that merely forwards the metric data via a Pipe. This allows us to immediately return a sender while the channel that serves the metric data is being constructed asynchronously.

ReportRequest

ReportRequest objects are a different kind of request that was handled similarly.

Channel ownership

The receivers of request objects create the channels and own them. For instance, in case of ComponentMetricRequests, MicrogridApiSource now maintains a dictionary (channel_lookup: dict[str, Broadcast]) as a replacement for the channel registry. It is necessary to have this lookup dictionary to avoid creating duplicate channels for similar metric requests. In this case, a new receiver is created from an existing channel found in the lookup dictionary.

Forwarding channels are owned by their creator (e.g. GridFrequency). These (and the Pipes that connect them to the other channels) are currently never closed. Closing these when all receivers or senders are closed is a TODO for a later iteration.

@github-actions github-actions Bot added part:tests Affects the unit, integration and performance (benchmarks) tests part:tooling Affects the development tooling (CI, deployment, dependency management, etc.) part:data-pipeline Affects the data pipeline part:core Affects the SDK core components (data structures, etc.) part:microgrid Affects the interactions with the microgrid labels Mar 5, 2026
@simonvoelcker simonvoelcker force-pushed the remove-channel-registry branch 2 times, most recently from f4e6125 to 9186ad8 Compare March 5, 2026 19:16
@github-actions github-actions Bot added the part:docs Affects the documentation label Mar 5, 2026
@simonvoelcker simonvoelcker force-pushed the remove-channel-registry branch 4 times, most recently from a878b7a to 22cd7c5 Compare March 6, 2026 14:42
Comment thread pyproject.toml Outdated
@simonvoelcker simonvoelcker marked this pull request as ready for review March 6, 2026 15:17
@simonvoelcker simonvoelcker requested a review from a team as a code owner March 6, 2026 15:17
@simonvoelcker simonvoelcker requested review from florian-wagner-frequenz and removed request for a team March 6, 2026 15:17
@llucax
Copy link
Copy Markdown
Contributor

llucax commented Mar 9, 2026

Sorry to be a party pooper. I just started with the review and looking at the data sourcing actor and I think we might be overcomplicating things.

Why can't we just reverse the ownership, and make the requester own the "data receiving channel"? So instead of creating a oneshot channel1, the requester creates a normal channel and gives the other end the sender. That's it! No back and forth sending and receiving receivers.

The only disadvantage of this approach is we can't de-duplicate channels so to speak, so no broadcasting like we do now (the data sourcing actor creates one channel per metric-cid combination I think, and create new receivers for any new party asking for data). We can still de-duplicate by subscribing to the microgrid API once and send sending to multiple senders, but we'll still have a channel per sender. But for broadcast channels we actually use one queue per receiver, so I guess in terms of memory usage it shouldn't change much.

So, this is a very early thought, I didn't think of all the other potential issues, but by looking at the changes it really feels to me that we are introducing non-trivial complexities with this change. For example the new requirement to being async to subscribe to data is gone with this approach. I think that's my main concern, we are converting something that is inherently sync (create a new receiver to receive data) into something async, and that's very disruptive.

I will continue with my review, but I wanted to mention this early, as we are planning to build on top of this, I want to raise it as soon as possible.

Footnotes

  1. See @sahas-subramanian-frequenz? I didn't thought about it and wrote basically create_oneshot_channel(), that must be a good name :trollface:

@simonvoelcker simonvoelcker force-pushed the remove-channel-registry branch 2 times, most recently from 38fb0ab to a28fab2 Compare March 9, 2026 13:56
@florian-wagner-frequenz florian-wagner-frequenz requested review from llucax and shsms and removed request for florian-wagner-frequenz March 9, 2026 13:56
@simonvoelcker
Copy link
Copy Markdown
Contributor Author

@llucax I completely agree that this change adds regrettable complexity, and we should explore ways of avoiding it.

It is my understanding that channels must be de-duplicated somehow because we might be dealing with hundreds of components in practice. The channels must then be owned centrally, whether that's near the source or in a singleton registry. Maybe we can revisit the registry idea, but store senders there instead of channels? That would avoid the oneshot channels.

Regarding sync-vs-async: The creation of channels was async before and stays async, no? We're sending ComponentMetricRequests either way, it's just that in one case we discussed today (GridFrequency.new_receiver) a receiver must be returned synchronously, so we have to use the forwarding channel.

@llucax
Copy link
Copy Markdown
Contributor

llucax commented Mar 9, 2026

The problem with the singleton channel registry for me is when things go wrong it is very hard to find out when a channel was first created, who requested it, etc. This problem came up and was very hard to debug. This is why I created the option to log when a channel is created in the registry (including the stack trace), but that turns out to pollute the logs excessively when enabled, as you can't really filter it by channel name, so it is printed for all channels.

This is why I think getting rid of the channel registry is a good goal, but I think we agree that not at all cost.

About the de-duplication, as I mentioned, it might be worth taking a deeper look at how much overhead really means having one channel per metric-cid vs. the current approach of having one receiver per metric-cid. As for broadcast channels we have a queue per receiver, and the sender just iterates over all receiver and put the new message there, I'm not sure it will be a big difference between having 1 broadcast channel with N receivers vs having N "unicast channels" (for now I guess we'll use a broadcast channel too.

Note

We explored in the past implementing a unicast channel but I think it was as slow as the broadcast channel so we kept the broadcast channel only. I guess for unicast we can just use a asyncio queue, not sure if that was what we used to implement the unicast channel.

Copy link
Copy Markdown
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will send this partial review, as all commits were split while I was mid-review. Will wait until things settle down to continue the review.

Comment thread src/frequenz/sdk/_internal/_channels.py Outdated
Comment thread src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py Outdated
Comment thread src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py Outdated
Comment thread benchmarks/timeseries/benchmark_datasourcing.py Outdated
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
@simonvoelcker simonvoelcker added the cmd:skip-release-notes It is not necessary to update release notes for this PR label Apr 2, 2026
@simonvoelcker simonvoelcker force-pushed the remove-channel-registry branch from 16d734a to ca39198 Compare April 2, 2026 11:00
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
@simonvoelcker simonvoelcker force-pushed the remove-channel-registry branch from ca39198 to 3e65895 Compare April 9, 2026 11:47
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
@simonvoelcker simonvoelcker force-pushed the remove-channel-registry branch from 0508095 to f6ca3ee Compare April 14, 2026 11:40
@llucax
Copy link
Copy Markdown
Contributor

llucax commented Apr 20, 2026

I guess we the channels release this is ready for a final review, let me know (request a review) when it is ready for another round.

Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
…ew, but only once

Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
@simonvoelcker simonvoelcker force-pushed the remove-channel-registry branch from 9fa75ab to 213f897 Compare April 20, 2026 13:42
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
@simonvoelcker simonvoelcker force-pushed the remove-channel-registry branch from 213f897 to d6c8706 Compare April 20, 2026 14:24
@simonvoelcker
Copy link
Copy Markdown
Contributor Author

@llucax I think it's ready again. Tbh I also lost some context since the last time I worked on this and also kinda ran out of steam in the end. Would appreciate if we can move this over the line so I can continue with phase 2.

@simonvoelcker simonvoelcker requested a review from llucax April 21, 2026 07:47
@llucax
Copy link
Copy Markdown
Contributor

llucax commented Apr 21, 2026

Oh, needs to resolve a conflict in the benchmark now, but I will review anyway, it is a very isolated part.

@simonvoelcker
Copy link
Copy Markdown
Contributor Author

@llucax yeah, I saw those and resolved them already locally. I'll push this after your review then, so as not to interfere. It's isolated, as you say.

Copy link
Copy Markdown
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate to revive old discussions, but the more we insist with this approach of using oneshot to replace the channel registry, the more I'm convinced this is not the right approach.

In this review round I found 3 serious and very obscure concurrency bugs. To be honest, I'm not sure if I would have caught these issues if I didn't used AI, as some issues can only be understood when looking at parts of the code that are not even in the diff (I asked for the AI review first, so we'll never know). I still spent about 3 hours trying to understand and validate the issues, so it is also not like I just asked AI and it was zero effort. That said it is also a possibility that I didn't get it right and some issue is not that serious, or extremely unlikely, but still...

This makes me very little confident that we even uncovered all potential issues with this approach, so I suggest stepping back again and thinking if we really want to move forward with this.

My feeling is we are planting a minefield here, and for what? Why are we removing the channel registry? If it is because of auto-closing, can't we just introduce auto-closing while keeping the channel registry? Maybe we can keep the channel class as non-deprecated and allow both usages of channels, one owning the whole channel and one driven by senders/receivers, or we can have some other mechanism to be able to spawn more senders/receivers for an existing channel as long as it is not closed.

I'm sure we can find a solution, but to me, adding this level of complexity is not acceptable, and will hunt us in the future even if we can get this PR right and bug free, we'll need to add more code that will have the same likelihood of getting obscure concurrency bugs.

In any case, I also asked the AI to add tests cases to expose these issues, I think it makes sense to add them, and here is a PR to add them to v1.x.x:

New test cases diff for this branch

[!NOTE]
This is an old iteration, I wanted to update it but I'm hitting some rate limiting :D

diff --git a/tests/actor/test_resampling.py b/tests/actor/test_resampling.py
index 6e8ee9389..f9e6dce73 100644
--- a/tests/actor/test_resampling.py
+++ b/tests/actor/test_resampling.py
@@ -266,3 +266,47 @@ def _now
         )
 
         await resampling_actor._resampler.stop()  # pylint: disable=protected-access
+
+
+async def test_stalled_request_does_not_block_later_subscriptions() -> None:
+    """Ensure one stalled data-source handshake doesn't block later requests."""
+    data_source_req_chan = Broadcast[ComponentMetricRequest](name="data-source-req")
+    data_source_req_recv = data_source_req_chan.new_receiver()
+    resampling_req_chan = Broadcast[ComponentMetricRequest](name="resample-req")
+    resampling_req_sender = resampling_req_chan.new_sender()
+
+    async with ComponentMetricsResamplingActor(
+        data_sourcing_request_sender=data_source_req_chan.new_sender(),
+        resampling_request_receiver=resampling_req_chan.new_receiver(),
+        config=ResamplerConfig2(
+            resampling_period=timedelta(seconds=0.2),
+            max_data_age_in_periods=2,
+        ),
+    ):
+        first_sender, _ = OneshotChannel[BroadcastReceiver[Sample[Quantity]]]()
+        second_sender, _ = OneshotChannel[BroadcastReceiver[Sample[Quantity]]]()
+
+        first_request = ComponentMetricRequest(
+            namespace="Resampling-A",
+            component_id=ComponentId(9),
+            metric=Metric.BATTERY_SOC_PCT,
+            start_time=None,
+            telem_stream_sender=first_sender,
+        )
+        second_request = ComponentMetricRequest(
+            namespace="Resampling-B",
+            component_id=ComponentId(10),
+            metric=Metric.AC_ACTIVE_POWER,
+            start_time=None,
+            telem_stream_sender=second_sender,
+        )
+
+        await resampling_req_sender.send(first_request)
+        stalled_data_source_request = await data_source_req_recv.receive()
+        assert stalled_data_source_request.component_id == ComponentId(9)
+
+        await resampling_req_sender.send(second_request)
+        later_data_source_request = await asyncio.wait_for(
+            data_source_req_recv.receive(), timeout=0.1
+        )
+        assert later_data_source_request.component_id == ComponentId(10)
diff --git a/tests/microgrid/test_data_sourcing.py b/tests/microgrid/test_data_sourcing.py
index c18031bc9..577eee20c 100644
--- a/tests/microgrid/test_data_sourcing.py
+++ b/tests/microgrid/test_data_sourcing.py
@@ -11,7 +11,13 @@
 
 import pytest
 import pytest_mock
-from frequenz.channels import Broadcast, BroadcastReceiver, OneshotChannel
+from frequenz.channels import (
+    Broadcast,
+    BroadcastReceiver,
+    OneshotChannel,
+    Receiver,
+    ReceiverStoppedError,
+)
 from frequenz.client.common.microgrid import MicrogridId
 from frequenz.client.common.microgrid.components import ComponentId
 from frequenz.client.microgrid.component import (
@@ -37,11 +43,30 @@
 )
 from frequenz.sdk.timeseries import Sample
 
+from ..utils.receive_timeout import Timeout
+
 T = TypeVar("T", bound=ComponentData)
+U = TypeVar("U")
 
 _MICROGRID_ID = MicrogridId(1)
 
 
+class Stopped:
+    """Sentinel for a closed receiver."""
+
+
+async def _receive_or_timeout(
+    receiver: Receiver[U], timeout: float = 0.1
+) -> U | type[Timeout] | type[Stopped]:
+    """Receive a message, or return a sentinel if the receiver stops or times out."""
+    try:
+        return await asyncio.wait_for(receiver.receive(), timeout=timeout)
+    except ReceiverStoppedError:
+        return Stopped
+    except asyncio.TimeoutError:
+        return Timeout
+
+
 @pytest.fixture
 def mock_connection_manager(mocker: pytest_mock.MockFixture) -> mock.Mock:
     """Fixture for getting a mock connection manager."""
@@ -124,6 +149,102 @@ def mock_connection_manager
             assert expected_sample_value + i == sample.value.base_value
 
 
+async def test_duplicate_requests_reply_to_all_callers(
+    mock_connection_manager: mock.Mock,  # pylint: disable=redefined-outer-name,unused-argument
+) -> None:
+    """Ensure duplicate direct subscriptions reply to every requester."""
+    req_chan = Broadcast[ComponentMetricRequest](name="data_sourcing_requests")
+    req_sender = req_chan.new_sender()
+
+    async with DataSourcingActor(req_chan.new_receiver()):
+        first_sender, first_receiver = OneshotChannel[
+            BroadcastReceiver[Sample[Quantity]]
+        ]()
+        second_sender, second_receiver = OneshotChannel[
+            BroadcastReceiver[Sample[Quantity]]
+        ]()
+
+        request = ComponentMetricRequest(
+            "test-namespace",
+            ComponentId(4),
+            Metric.AC_ACTIVE_POWER,
+            None,
+            first_sender,
+        )
+        duplicate_request = ComponentMetricRequest(
+            "test-namespace",
+            ComponentId(4),
+            Metric.AC_ACTIVE_POWER,
+            None,
+            second_sender,
+        )
+
+        await req_sender.send(request)
+        _ = await asyncio.wait_for(first_receiver.receive(), timeout=1.0)
+
+        await req_sender.send(duplicate_request)
+        _ = await asyncio.wait_for(second_receiver.receive(), timeout=1.0)
+
+
+async def test_unknown_component_request_does_not_wait_forever(
+    mock_connection_manager: mock.Mock,  # pylint: disable=redefined-outer-name,unused-argument
+) -> None:
+    """Ensure invalid component requests fail fast instead of hanging callers."""
+    req_chan = Broadcast[ComponentMetricRequest](name="data_sourcing_requests")
+    req_sender = req_chan.new_sender()
+
+    async with DataSourcingActor(req_chan.new_receiver()):
+        telem_stream_sender, telem_stream_receiver = OneshotChannel[
+            BroadcastReceiver[Sample[Quantity]]
+        ]()
+        request = ComponentMetricRequest(
+            "test-namespace",
+            ComponentId(999),
+            Metric.AC_ACTIVE_POWER,
+            None,
+            telem_stream_sender,
+        )
+
+        await req_sender.send(request)
+        result = await _receive_or_timeout(telem_stream_receiver)
+        assert result is not Timeout
+
+
+async def test_invalid_metric_request_does_not_block_later_requests(
+    mock_connection_manager: mock.Mock,  # pylint: disable=redefined-outer-name,unused-argument
+) -> None:
+    """Ensure a bad request doesn't stall later valid subscriptions."""
+    req_chan = Broadcast[ComponentMetricRequest](name="data_sourcing_requests")
+    req_sender = req_chan.new_sender()
+
+    async with DataSourcingActor(req_chan.new_receiver()):
+        invalid_sender, _ = OneshotChannel[BroadcastReceiver[Sample[Quantity]]]()
+        valid_sender, valid_receiver = OneshotChannel[
+            BroadcastReceiver[Sample[Quantity]]
+        ]()
+
+        invalid_request = ComponentMetricRequest(
+            "test-namespace-invalid",
+            ComponentId(4),
+            Metric.BATTERY_SOC_PCT,
+            None,
+            invalid_sender,
+        )
+        valid_request = ComponentMetricRequest(
+            "test-namespace-valid",
+            ComponentId(4),
+            Metric.AC_ACTIVE_POWER,
+            None,
+            valid_sender,
+        )
+
+        await req_sender.send(invalid_request)
+        await req_sender.send(valid_request)
+
+        result = await _receive_or_timeout(valid_receiver)
+        assert result is not Timeout
+
+
 def _new_meter_data(
     component_id: ComponentId, timestamp: datetime, value: float
 ) -> MeterData:
diff --git a/tests/timeseries/test_power_status_subscription_races.py b/tests/timeseries/test_power_status_subscription_races.py
new file mode 100644
index 000000000..a99ce5c02
--- /dev/null
+++ b/tests/timeseries/test_power_status_subscription_races.py
@@ -0,0 +1,140 @@
+# License: MIT
+# Copyright © 2026 Frequenz Energy-as-a-Service GmbH
+
+"""Regression tests for pool power_status subscription races."""
+
+import asyncio
+from types import SimpleNamespace
+
+from frequenz.channels import Broadcast, Receiver, Sender
+from frequenz.client.common.microgrid.components import ComponentId
+from frequenz.quantities import Power
+
+from frequenz.sdk import timeseries
+from frequenz.sdk.microgrid._power_managing import ReportRequest, _Report
+from frequenz.sdk.timeseries.battery_pool._battery_pool import BatteryPool
+from frequenz.sdk.timeseries.ev_charger_pool._ev_charger_pool import EVChargerPool
+from frequenz.sdk.timeseries.pv_pool._pv_pool import PVPool
+
+
+def _new_report(target_power_watts: float) -> _Report:
+    """Create a distinct report for subscription assertions."""
+    target_power = Power.from_watts(target_power_watts)
+    return _Report(
+        target_power=target_power,
+        _inclusion_bounds=timeseries.Bounds(target_power, target_power),
+        _exclusion_bounds=None,
+    )
+
+
+async def _assert_same_instance_subscriptions_are_independent(
+    pool: BatteryPool | EVChargerPool | PVPool,
+    requests_rx: Receiver[ReportRequest],
+) -> None:
+    """Ensure two subscriptions on the same pool instance remain independent."""
+    existing_tasks = {
+        task for task in asyncio.all_tasks() if task is not asyncio.current_task()
+    }
+
+    try:
+        first_status_rx = pool.power_status.new_receiver()
+        second_status_rx = pool.power_status.new_receiver()
+
+        await asyncio.sleep(0)
+
+        first_request = await asyncio.wait_for(requests_rx.receive(), timeout=1.0)
+        second_request = await asyncio.wait_for(requests_rx.receive(), timeout=1.0)
+
+        first_report_channel = Broadcast[_Report](name="first-report", resend_latest=True)
+        second_report_channel = Broadcast[_Report](
+            name="second-report", resend_latest=True
+        )
+
+        await first_request.report_stream_sender.send(first_report_channel.new_receiver())
+        await second_request.report_stream_sender.send(
+            second_report_channel.new_receiver()
+        )
+
+        await asyncio.sleep(0)
+
+        first_report = _new_report(100.0)
+        second_report = _new_report(200.0)
+
+        await first_report_channel.new_sender().send(first_report)
+        await second_report_channel.new_sender().send(second_report)
+
+        received_first_report = await asyncio.wait_for(
+            first_status_rx.receive(), timeout=0.1
+        )
+        received_second_report = await asyncio.wait_for(
+            second_status_rx.receive(), timeout=0.1
+        )
+
+        assert received_first_report.target_power == first_report.target_power
+        assert received_second_report.target_power == second_report.target_power
+    finally:
+        created_tasks = [
+            task
+            for task in asyncio.all_tasks()
+            if task is not asyncio.current_task() and task not in existing_tasks
+        ]
+        for task in created_tasks:
+            task.cancel()
+        await asyncio.gather(*created_tasks, return_exceptions=True)
+
+
+async def test_battery_pool_power_status_same_instance_subscriptions_are_independent() -> None:
+    """Ensure battery pool power_status subscriptions don't race each other."""
+    requests_channel = Broadcast[ReportRequest](name="battery-pool-requests")
+    pool = BatteryPool(
+        pool_ref_store=SimpleNamespace(
+            _batteries=frozenset({ComponentId(8), ComponentId(18)}),
+            _power_manager_bounds_subscription_sender=requests_channel.new_sender(),
+            _power_bounds_subs={},
+        ),
+        name="battery-pool",
+        priority=5,
+    )
+
+    await _assert_same_instance_subscriptions_are_independent(
+        pool,
+        requests_channel.new_receiver(),
+    )
+
+
+async def test_ev_charger_pool_power_status_same_instance_subscriptions_are_independent() -> None:
+    """Ensure EV charger pool power_status subscriptions don't race each other."""
+    requests_channel = Broadcast[ReportRequest](name="ev-pool-requests")
+    pool = EVChargerPool(
+        pool_ref_store=SimpleNamespace(
+            component_ids=frozenset({ComponentId(12), ComponentId(22)}),
+            power_manager_bounds_subs_sender=requests_channel.new_sender(),
+            power_bounds_subs={},
+        ),
+        name="ev-pool",
+        priority=5,
+    )
+
+    await _assert_same_instance_subscriptions_are_independent(
+        pool,
+        requests_channel.new_receiver(),
+    )
+
+
+async def test_pv_pool_power_status_same_instance_subscriptions_are_independent() -> None:
+    """Ensure PV pool power_status subscriptions don't race each other."""
+    requests_channel = Broadcast[ReportRequest](name="pv-pool-requests")
+    pool = PVPool(
+        pool_ref_store=SimpleNamespace(
+            component_ids=frozenset({ComponentId(28), ComponentId(38)}),
+            power_manager_bounds_subs_sender=requests_channel.new_sender(),
+            power_bounds_subs={},
+        ),
+        name="pv-pool",
+        priority=5,
+    )
+
+    await _assert_same_instance_subscriptions_are_independent(
+        pool,
+        requests_channel.new_receiver(),
+    )

Comment on lines 531 to 541
for existing_request in self._req_streaming_metrics[comp_id][request.metric]:
if existing_request.get_channel_name() == request.get_channel_name():
# the requested metric is already being handled, so nothing to do.
return

self._req_streaming_metrics[comp_id][request.metric].append(request)

await self._update_streams(
comp_id,
category,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we have a problem here. Found by AI but seems to make sense:

If the requested metric was already requested or the component ID is unknown (category is None), we just return, which means we don't call self._update_streams(), which means we don't call self._handle_data_stream(), which means we don't call self._get_metric_senders(), which means we don't call await request.telem_stream_sender.send(telem_stream.new_receiver()), which means we never reply to the requester.

I guess we can reply in here before the return, but this is getting very hard to follow, so not sure if we can find a better approach. Maybe do the replying completely outside all that call stack, and in this function, right after we either created the new channel or got the existing channel, but I guess this will imply awaiting on the task that is creating the new channel.

Full AI analysis including a proposed fix (long)

What goes wrong

A direct caller now creates a fresh oneshot channel for every subscription request and waits for a BroadcastReceiver to be handed back.

You can see that in:

  • GridFrequency.subscribe() at src/frequenz/sdk/timeseries/_grid_frequency.py:116-130
  • GridFrequency.new_receiver()’s background path at src/frequenz/sdk/timeseries/_grid_frequency.py:107-114, 132-140
  • ResampledStreamFetcher.fetch_stream() at src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py:52-65

Each of those callers expects this sequence:

  1. create OneshotChannel
  2. send ComponentMetricRequest(..., telem_stream_sender=...)
  3. wait on telem_stream_receiver.receive()

The request object itself makes the dedupe key explicit:

  • ComponentMetricRequest.get_channel_name() is based on namespace, component_id, metric, and optional start_time
  • see src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py:59-73

So two requests for the same logical stream have the same get_channel_name(), but different oneshot senders.

The exact failure path

In MicrogridApiSource.add_metric():

for existing_request in self._req_streaming_metrics[comp_id][request.metric]:
    if existing_request.get_channel_name() == request.get_channel_name():
        # the requested metric is already being handled, so nothing to do.
        return

That is at:

  • src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py:531-534

The bug is that “nothing to do” is false now.

Why? Because the only place that replies to a caller’s oneshot sender is later, in _get_metric_senders():

  • src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py:412-423

Specifically:

await request.telem_stream_sender.send(telem_stream.new_receiver())

But _get_metric_senders() only iterates over the requests that are still present in req_list.

So if request B is deduped against request A:

  • A stays in req_list
  • B is dropped early
  • only A gets its oneshot reply
  • B waits forever on receive()

Why this hangs forever

The second caller is waiting on a oneshot receiver that will never get a value.

There is no timeout in:

  • GridFrequency.subscribe() (_grid_frequency.py:128-129)
  • GridFrequency._send_request() (_grid_frequency.py:138-140)
  • ResampledStreamFetcher.fetch_stream() (_resampled_stream_fetcher.py:64-65)

So if no one ever sends on that oneshot sender, the waiter just sits there.

Why subscribe() + deprecated new_receiver() is broken too

GridFrequency has two request paths now:

New path

subscribe() creates a fresh oneshot sender every time:

  • _grid_frequency.py:118-126

Deprecated path

new_receiver() uses a pre-created oneshot receiver stored on the instance and starts _send_request() once:

  • _grid_frequency.py:107-114
  • _grid_frequency.py:132-140

Both paths generate requests with the same logical stream key:

  • namespace = "grid-frequency"
  • same component id
  • same metric = AC_FREQUENCY

So they collide in add_metric().

That means:

  • if new_receiver() gets there first, then subscribe() can be deduped and hang forever
  • if subscribe() gets there first, then the background _send_request() started by new_receiver() can hang forever waiting on self._telem_stream_receiver.receive()

So the returned receiver from new_receiver() may exist, but it never gets wired to the underlying telemetry stream.

Practical impact

This is high severity because it is:

  • deterministic once the duplicate order happens
  • silent — no exception, just a permanent wait
  • user-facing — it affects public subscription APIs
  • sticky — the waiting task doesn’t recover on its own

The fix

The right non-breaking fix is:

dedupe the underlying stream, but still reply to every caller

In other words:

  • keep only one upstream telemetry channel per get_channel_name()
  • but hand out a fresh BroadcastReceiver to each caller’s oneshot sender

Important subtlety

A naïve fix would be “just append the duplicate request too”.

That does not work, because _get_metric_senders() would then create multiple senders for the same broadcast channel and send duplicate samples into the same stream.

So the fix has to separate:

  1. canonical stream registration
  2. pending caller replies

Suggested fix shape

Introduce a pending-replies map keyed by channel_name, for example:

self._pending_replies: dict[
    str, list[OneshotSender[BroadcastReceiver[Sample[Quantity]]]]
] = {}

In add_metric()

  • always register the caller’s oneshot sender in _pending_replies[channel_name]
  • if this is a duplicate and the channel already exists, reply immediately
  • if this is a duplicate and the channel does not exist yet, return and let the eventual stream setup flush all pending replies
  • only the first request should be appended to _req_streaming_metrics

Sketch:

async def add_metric(self, request: ComponentMetricRequest) -> None:
    comp_id = request.component_id
    category = await self._get_component_category(comp_id)
    if category is None:
        _logger.error("Unknown component ID: %d in request %s", comp_id, request)
        return

    channel_name = request.get_channel_name()
    self._pending_replies.setdefault(channel_name, []).append(
        request.telem_stream_sender
    )

    self._req_streaming_metrics.setdefault(comp_id, {}).setdefault(request.metric, [])

    for existing_request in self._req_streaming_metrics[comp_id][request.metric]:
        if existing_request.get_channel_name() == channel_name:
            if channel_name in self._channels:
                await self._flush_pending_replies(channel_name)
            return

    self._req_streaming_metrics[comp_id][request.metric].append(request)
    await self._update_streams(comp_id, category)

Helper to answer everyone waiting for that logical stream

async def _flush_pending_replies(self, channel_name: str) -> BroadcastSender[Sample[Quantity]]:
    if channel_name not in self._channels:
        self._channels[channel_name] = Broadcast(name=channel_name)

    telem_stream = self._channels[channel_name]

    for sender in self._pending_replies.pop(channel_name, []):
        try:
            await sender.send(telem_stream.new_receiver())
        except SenderClosedError:
            pass

    return telem_stream.new_sender()

In _get_metric_senders()

Deduplicate by channel_name, reply to all pending callers for that channel, and create only one broadcast sender per logical stream:

async def _get_metric_senders(...):
    all_senders = []
    for metric, req_list in requests.items():
        extraction_method = self._get_data_extraction_method(category, metric)
        senders = []
        seen_channel_names: set[str] = set()

        for request in req_list:
            channel_name = request.get_channel_name()
            if channel_name in seen_channel_names:
                continue
            seen_channel_names.add(channel_name)

            senders.append(await self._flush_pending_replies(channel_name))

        all_senders.append((extraction_method, senders))

    return all_senders

Why this fix is good

It preserves all the good parts of the current design:

  • still one underlying stream per logical metric request
  • still one broadcast per logical stream
  • still one fresh receiver per caller
  • still non-breaking API

And it fixes both cases:

  • duplicate direct subscriptions
  • subscribe() mixed with deprecated new_receiver()

One important note

This fix addresses the duplicate request bug.

It does not fix the separate invalid-request bug where data sourcing can return/raise without replying to the oneshot at all. That still needs its own explicit error/close path.

try:
await request.telem_stream_sender.send(telem_stream.new_receiver())
except SenderClosedError:
pass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how this could happen and not be an issue, someone would have to create the oneshot channel, send the request and close the receiver/sender before receiving. I guess that could be a valid way to cancel a request. In any case, I think I would add at least a DEBUG log here saying why we are ignoring this exception (something like "assuming the request was cancelled by the requester").

Comment on lines 332 to +367
@property
def power_status(self) -> ReceiverFetcher[BatteryPoolReport]:
"""Get a receiver to receive new power status reports when they change.

These include
- the current inclusion/exclusion bounds available for the pool's priority,
- the current target power for the pool's set of batteries,
- the result of the last distribution request for the pool's set of batteries.

Returns:
A receiver that will stream power status reports for the pool's priority.
"""
sub = _power_managing.ReportRequest(
report_stream_sender, self._report_stream_receiver = OneshotChannel[
BroadcastReceiver[_Report]
]()
request = _power_managing.ReportRequest(
source_id=self._source_id,
priority=self._priority,
component_ids=self._pool_ref_store._batteries,
report_stream_sender=report_stream_sender,
)

forwarding_channel = Broadcast[_Report](
name=request.get_channel_name() + ":Forwarded",
resend_latest=True,
)
self._pool_ref_store._power_bounds_subs[sub.get_channel_name()] = (

self._pool_ref_store._power_bounds_subs[request.get_channel_name()] = (
asyncio.create_task(
self._pool_ref_store._power_manager_bounds_subscription_sender.send(sub)
self._send_request(forwarding_channel.new_sender(), request)
)
)
channel = self._pool_ref_store._channel_registry.get_or_create(
_power_managing._Report, sub.get_channel_name()

return forwarding_channel

async def _send_request(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we have a rece condition here. Also AI assisted finding but looks legit to me.

If power_status is called twice very fast, self._report_stream_receiver is overwritten, losing the reference to the first oneshot channel receiver, and the same could happen for _pipe(), but for this to happen the first call should have had time to actually give execution time to the _send_request() task so self._pipe is actually assigned, and while await self._pipe.start() is waiting, a new task could override it too. I guess this is much more unlikely, but yet still possible.

And there is also a potential race for self._pool_ref_store._power_bounds_subs[sub.get_channel_name()] too, which can lead to losing the reference to the task object. But I think this issue predates this PR, as we already had the task and self._pool_ref_store._power_bounds_subs[sub.get_channel_name()]. And I think this demonstrate why I didn't like very much the approach to create tasks to make async stuff look like async, but we have many of these before, so I guess it will a big undertaking of its own to get rid of this and make our async code more sound.

As for the fix, we should probably make some stuff task-local by passing it as an argument instead of using the instance shared state.

Since it took me a bit to understand the whole dimension of this, in case it is useful, this is the whole AI analysis (which includes details on a possible fix):

AI analysis and fix proposal (long)

The Race Condition in power_status — Detailed Explanation

The Setup

Each pool class (BatteryPool, EVChargerPool, PVPool) stores two pieces of per-request handshake state as instance attributes:

self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None
self._pipe: Pipe[_Report] | None = None

When power_status is accessed, it:

  1. Creates a OneshotChannel pair (sender + receiver)
  2. Stores the receiver on self._report_stream_receiver (overwrites any previous value)
  3. Creates a ReportRequest carrying the sender
  4. Spawns an async task (_send_request) that sends the request to the power manager, then reads from self._report_stream_receiver to get the reply

The Race

get_channel_name() returns f"power_manager.report.{self.component_ids=}.{self.priority=}" — it's deterministic for a given pool instance. Two calls on the same instance produce the same key. Here's the interleaving:

Call A:                                     Call B:
──────                                      ──────
sender_A, recv_A = OneshotChannel()
self._report_stream_receiver = recv_A  ←── overwritten below
                                            sender_B, recv_B = OneshotChannel()
                                            self._report_stream_receiver = recv_B
task_A = create_task(_send_request(...))
                                            power_bounds_subs[key] = task_B  ←── overwrites task_A ref
                                            task_B = create_task(_send_request(...))

─── async tasks resume ───

task_A: sends request_A via power manager
        await self._report_stream_receiver.receive()
        ⬆ reads recv_B (not recv_A!) — self was overwritten

task_B: sends request_B via power manager
        await self._report_stream_receiver.receive()
        ⬆ also reads recv_B — contends with task_A

Three bugs compound:

What gets overwritten Consequence
self._report_stream_receiver Task A reads from the wrong oneshot (recv_B). The actor correctly sends reply_A through sender_A → recv_A, but nobody reads recv_A. Task A either gets reply_B (cross-wired) or hangs forever waiting on recv_B if task B consumes it first.
self._pipe When task B creates its Pipe and stores it in self._pipe, task A's Pipe loses its only reference and becomes eligible for GC. The comment on line 87 even says "Keep a reference to prevent garbage collector from destroying pipe" — the race defeats this exact protection.
power_bounds_subs[key] The dict uses get_channel_name() as key, which is identical for both calls. Task A's handle is silently replaced. The task still runs (the event loop holds it), but it can no longer be cancelled via stop().

Why This Hasn't Blown Up (Yet)

In practice, power_status is typically accessed once per pool instance during setup. The race requires two calls before the first task's await yields — a narrow window. But it's a latent correctness bug that becomes a hard-to-diagnose hang under any workload that re-subscribes.


The Fix

The fix follows the pattern already used elsewhere in this codebase (_resampling.py:63–110, _grid_frequency.py:116–130): make the oneshot receiver task-local by passing it as a parameter, and collect pipes in a list instead of a single slot.

Here are the exact changes for all three files:

1. _battery_pool.py

__init__ — replace the two instance attrs with a pipe list:

# BEFORE (lines 86–88):
        self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None
        # Keep a reference to prevent garbage collector from destroying pipe
        self._pipe: Pipe[_Report] | None = None

# AFTER:
        # Keep references to prevent garbage collector from destroying pipes
        self._pipes: list[Pipe[_Report]] = []

power_status — make the receiver local, pass it to the task:

# BEFORE (lines 344–365):
        report_stream_sender, self._report_stream_receiver = OneshotChannel[
            BroadcastReceiver[_Report]
        ]()
        request = _power_managing.ReportRequest(
            source_id=self._source_id,
            priority=self._priority,
            component_ids=self._pool_ref_store._batteries,
            report_stream_sender=report_stream_sender,
        )

        forwarding_channel = Broadcast[_Report](
            name=request.get_channel_name() + ":Forwarded",
            resend_latest=True,
        )

        self._pool_ref_store._power_bounds_subs[request.get_channel_name()] = (
            asyncio.create_task(
                self._send_request(forwarding_channel.new_sender(), request)
            )
        )

        return forwarding_channel

# AFTER:
        report_stream_sender, report_stream_receiver = OneshotChannel[
            BroadcastReceiver[_Report]
        ]()
        request = _power_managing.ReportRequest(
            source_id=self._source_id,
            priority=self._priority,
            component_ids=self._pool_ref_store._batteries,
            report_stream_sender=report_stream_sender,
        )

        forwarding_channel = Broadcast[_Report](
            name=request.get_channel_name() + ":Forwarded",
            resend_latest=True,
        )

        self._pool_ref_store._power_bounds_subs[request.get_channel_name()] = (
            asyncio.create_task(
                self._send_request(
                    forwarding_channel.new_sender(),
                    request,
                    report_stream_receiver,
                )
            )
        )

        return forwarding_channel

_send_request — accept receiver as parameter, append pipe to list:

# BEFORE (lines 367–385):
    async def _send_request(
        self,
        forwarding_sender: Sender[_Report],
        request: ReportRequest,
    ) -> None:
        """Send the report request and receive the report channel.

        Connect it via pipe to the channel that was returned in power_status.
        """
        await self._pool_ref_store._power_manager_bounds_subscription_sender.send(
            request
        )

        assert self._report_stream_receiver is not None
        report_receiver: Receiver[_Report] = (
            await self._report_stream_receiver.receive()
        )
        self._pipe = Pipe(report_receiver, forwarding_sender)
        await self._pipe.start()

# AFTER:
    async def _send_request(
        self,
        forwarding_sender: Sender[_Report],
        request: ReportRequest,
        report_stream_receiver: Receiver[Receiver[_Report]],
    ) -> None:
        """Send the report request and receive the report channel.

        Connect it via pipe to the channel that was returned in power_status.
        """
        await self._pool_ref_store._power_manager_bounds_subscription_sender.send(
            request
        )

        report_receiver: Receiver[_Report] = await report_stream_receiver.receive()
        pipe = Pipe(report_receiver, forwarding_sender)
        self._pipes.append(pipe)
        await pipe.start()

2. _ev_charger_pool.py

The same three-part change, adjusted for this file's naming conventions:

__init__ (lines 73–75):

# BEFORE:
        self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None
        # Keep a reference to prevent garbage collector from destroying pipe
        self._pipe: Pipe[_Report] | None = None

# AFTER:
        # Keep references to prevent garbage collector from destroying pipes
        self._pipes: list[Pipe[_Report]] = []

power_status (lines 186–207):

# BEFORE:
        report_stream_sender, self._report_stream_receiver = OneshotChannel[
            BroadcastReceiver[_Report]
        ]()
        request = _power_managing.ReportRequest(
            source_id=self._source_id,
            priority=self._priority,
            component_ids=self._pool_ref_store.component_ids,
            report_stream_sender=report_stream_sender,
        )

        forwarding_channel = Broadcast[_Report](
            name=request.get_channel_name() + ":Forwarded",
            resend_latest=True,
        )

        self._pool_ref_store.power_bounds_subs[request.get_channel_name()] = (
            asyncio.create_task(
                self._send_request(forwarding_channel.new_sender(), request)
            )
        )

        return forwarding_channel

# AFTER:
        report_stream_sender, report_stream_receiver = OneshotChannel[
            BroadcastReceiver[_Report]
        ]()
        request = _power_managing.ReportRequest(
            source_id=self._source_id,
            priority=self._priority,
            component_ids=self._pool_ref_store.component_ids,
            report_stream_sender=report_stream_sender,
        )

        forwarding_channel = Broadcast[_Report](
            name=request.get_channel_name() + ":Forwarded",
            resend_latest=True,
        )

        self._pool_ref_store.power_bounds_subs[request.get_channel_name()] = (
            asyncio.create_task(
                self._send_request(
                    forwarding_channel.new_sender(),
                    request,
                    report_stream_receiver,
                )
            )
        )

        return forwarding_channel

_send_request (lines 209–225):

# BEFORE:
    async def _send_request(
        self,
        forwarding_sender: Sender[_Report],
        request: ReportRequest,
    ) -> None:
        """Send the report request and receive the report channel.

        Connect it via pipe to the channel that was returned in power_status.
        """
        await self._pool_ref_store.power_manager_bounds_subs_sender.send(request)

        assert self._report_stream_receiver is not None
        report_receiver: Receiver[_Report] = (
            await self._report_stream_receiver.receive()
        )
        self._pipe = Pipe(report_receiver, forwarding_sender)
        await self._pipe.start()

# AFTER:
    async def _send_request(
        self,
        forwarding_sender: Sender[_Report],
        request: ReportRequest,
        report_stream_receiver: Receiver[Receiver[_Report]],
    ) -> None:
        """Send the report request and receive the report channel.

        Connect it via pipe to the channel that was returned in power_status.
        """
        await self._pool_ref_store.power_manager_bounds_subs_sender.send(request)

        report_receiver: Receiver[_Report] = await report_stream_receiver.receive()
        pipe = Pipe(report_receiver, forwarding_sender)
        self._pipes.append(pipe)
        await pipe.start()

3. _pv_pool.py

Identical structure to EV charger:

__init__ (lines 68–70):

# BEFORE:
        self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None
        # Keep a reference to prevent garbage collector from destroying pipe
        self._pipe: Pipe[_Report] | None = None

# AFTER:
        # Keep references to prevent garbage collector from destroying pipes
        self._pipes: list[Pipe[_Report]] = []

power_status (lines 158–179):

# BEFORE:
        report_stream_sender, self._report_stream_receiver = OneshotChannel[
            BroadcastReceiver[_Report]
        ]()
        ...
        self._pool_ref_store.power_bounds_subs[request.get_channel_name()] = (
            asyncio.create_task(
                self._send_request(forwarding_channel.new_sender(), request)
            )
        )

# AFTER:
        report_stream_sender, report_stream_receiver = OneshotChannel[
            BroadcastReceiver[_Report]
        ]()
        ...
        self._pool_ref_store.power_bounds_subs[request.get_channel_name()] = (
            asyncio.create_task(
                self._send_request(
                    forwarding_channel.new_sender(),
                    request,
                    report_stream_receiver,
                )
            )
        )

_send_request (lines 181–197):

# BEFORE:
    async def _send_request(
        self,
        forwarding_sender: Sender[_Report],
        request: ReportRequest,
    ) -> None:
        ...
        await self._pool_ref_store.power_manager_bounds_subs_sender.send(request)

        assert self._report_stream_receiver is not None
        report_receiver: Receiver[_Report] = (
            await self._report_stream_receiver.receive()
        )
        self._pipe = Pipe(report_receiver, forwarding_sender)
        await self._pipe.start()

# AFTER:
    async def _send_request(
        self,
        forwarding_sender: Sender[_Report],
        request: ReportRequest,
        report_stream_receiver: Receiver[Receiver[_Report]],
    ) -> None:
        ...
        await self._pool_ref_store.power_manager_bounds_subs_sender.send(request)

        report_receiver: Receiver[_Report] = await report_stream_receiver.receive()
        pipe = Pipe(report_receiver, forwarding_sender)
        self._pipes.append(pipe)
        await pipe.start()

Optional Further Hardening: Unique Task Keys

The power_bounds_subs dict uses get_channel_name() as key, which is the same for repeated calls on the same instance. This silently drops the old task reference. While the task still runs (the event loop keeps it alive), stop() can no longer cancel it. A straightforward fix would be to append a uuid to the key:

task_key = f"{request.get_channel_name()}:{uuid.uuid4()}"
self._pool_ref_store._power_bounds_subs[task_key] = asyncio.create_task(...)

This is lower severity since it only affects cleanup, but it prevents orphaned tasks from surviving stop().


Summary

Root cause Shared mutable state (self._report_stream_receiver, self._pipe) written by sync code, read by async tasks
Fix principle Make per-request state task-local: pass the oneshot receiver as a function parameter, collect pipes in a list
Files changed 3 files, ~15 lines each, all mechanical
Risk Very low — the change is strictly narrower in scope (removes shared state), follows existing codebase patterns, and the external contract (ReceiverFetcher return type) is unchanged
Precedent in codebase _resampling.py:63–110, _grid_frequency.py:116–130, _voltage_streamer.py:139–166 all use this local-oneshot pattern correctly

Note

This applies to the other pools too.

telem_stream_sender=sender,
)
await self._data_sourcing_request_sender.send(data_source_request)
return await receiver.receive()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this is introducing a new subtle concurrency bug (also found with AI).

ComponentMetricsResamplingActor processes subscription requests strictly serially in _process_resampling_requests() via async for request ...: await self._subscribe(request) (src/frequenz/sdk/microgrid/_resampling.py:112-115).

Inside that, _subscribe() calls _subscribe_to_data_source() (this method), which sends a data-sourcing request and then waits synchronously for a oneshot reply with return await receiver.receive(). From the channels source, that oneshot wait has no built-in timeout: it wakes only if someone calls send() or aclose(). If neither happens, it can wait forever.

Before the PR a slow upstream setup might delay data arrival, but not subscription registration in the resampling actor. The resampling actor could still keep draining its incoming request queue because it only needed the channel name and local registry wiring.

After the PR a slow upstream setup delays subscription registration itself. Now the resampling actor cannot process later requests until the current one gets its oneshot reply.

Full AI analysis and fix proposal (long)
  • What actually stalls

    • Resampling request A arrives.
    • _process_resampling_requests() starts handling A and blocks in _subscribe_to_data_source() waiting for the data-sourcing reply.
    • Requests B, C, D are still accepted into the resampling actor’s request channel, but they are not processed because the loop never gets past A.
    • The queue is bounded at 500 (src/frequenz/sdk/microgrid/_data_pipeline.py:54-59, 460-468), so after enough blocked requests, senders themselves start back-pressuring.
  • Why the reply can disappear

    • The happy path is MicrogridApiSource._get_metric_senders() sending telem_stream.new_receiver() back through request.telem_stream_sender (src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py:415-420).
    • But there are bad paths where no reply is sent at all. One concrete one is the unknown-component case in add_metric(), which logs and returns without replying (microgrid_api_source.py:521-525).
    • So the resampling actor can end up waiting forever on a reply that will never come.
  • Implications / risk

    • This is broader than “one request fails”: one bad subscription can delay unrelated later subscriptions behind it.
    • It does not necessarily crash the actor, which makes it worse operationally: a stuck await receiver.receive() just hangs; it doesn’t trigger restart/recovery.
    • In practice this means callers waiting for new resampled streams can hang, and under load the request queue can fill and propagate backpressure outward.

Recommended fix

I’d treat this as a two-layer fix:

  1. Fix the root cause in data sourcing
    Data sourcing should never silently drop a request without replying. On invalid requests, it should explicitly fail the oneshot handshake instead of just returning.

  2. Make resampling robust against missing replies anyway
    Even if data sourcing is fixed, resampling should not let one bad handshake stall all later requests.

The safest non-breaking design is:

A. Stop processing subscriptions inline

Instead of this:

async for request in self._resampling_request_receiver:
    await self._subscribe(request)

process each request in its own task and supervise those tasks.

B. Coalesce duplicate in-flight subscriptions

Because concurrent tasks can race on the same request_channel_name, keep a pending_subscriptions map so repeated requests share the same initialization task/future instead of creating duplicate source subscriptions.

C. Bound the oneshot wait

Wrap the data-sourcing reply wait in a timeout, e.g. asyncio.timeout(...), so a lost reply becomes a logged failure instead of a permanent hang.

Suggested shape

Something like this internally:

self._pending_subscriptions: dict[str, asyncio.Task[Broadcast[Sample[Quantity]]]] = {}
self._subscription_tasks: set[asyncio.Task[None]] = set()

Then:

async def _subscribe_to_data_source(
    self, request: ComponentMetricRequest
) -> BroadcastReceiver[Sample[Quantity]]:
    sender, receiver = OneshotChannel[BroadcastReceiver[Sample[Quantity]]]()
    data_source_request = ComponentMetricRequest(
        namespace=request.namespace + ":Source",
        component_id=request.component_id,
        metric=request.metric,
        start_time=request.start_time,
        telem_stream_sender=sender,
    )
    await self._data_sourcing_request_sender.send(data_source_request)
    async with asyncio.timeout(5.0):
        return await receiver.receive()

And coalesced initialization:

async def _ensure_subscription(
    self, request: ComponentMetricRequest
) -> Broadcast[Sample[Quantity]]:
    request_channel_name = request.get_channel_name()

    if channel := self._data_sink_channels.get(request_channel_name):
        return channel

    if pending := self._pending_subscriptions.get(request_channel_name):
        return await pending

    async def init() -> Broadcast[Sample[Quantity]]:
        data_source = await self._subscribe_to_data_source(request)
        data_sink_channel = Broadcast(name=request_channel_name, resend_latest=True)
        self._data_sink_channels[request_channel_name] = data_sink_channel
        self._resampler.add_timeseries(
            name=request_channel_name,
            source=data_source,
            sink=data_sink_channel.new_sender().send,
        )
        return data_sink_channel

    task = asyncio.create_task(init(), name=f"init:{request_channel_name}")
    self._pending_subscriptions[request_channel_name] = task
    try:
        return await task
    finally:
        self._pending_subscriptions.pop(request_channel_name, None)

Then the request loop can stay responsive:

async def _handle_subscription_request(self, request: ComponentMetricRequest) -> None:
    channel = await self._ensure_subscription(request)
    await request.telem_stream_sender.send(channel.new_receiver())

async def _process_resampling_requests(self) -> None:
    async for request in self._resampling_request_receiver:
        task = asyncio.create_task(
            self._handle_subscription_request(request),
            name=f"subscribe:{request.get_channel_name()}",
        )
        self._subscription_tasks.add(task)
        task.add_done_callback(self._subscription_tasks.discard)

Why this is the best fix

  • It removes the head-of-line blocking.
  • It preserves the current external API.
  • It prevents duplicate initialization races for the same request.
  • It turns “wait forever” into a bounded failure.
  • It still works even if data sourcing regresses again later.

If you want, I can also turn this into a minimal patch against _resampling.py and sketch the matching data-sourcing change.

try:
await sub.report_stream_sender.send(channel.new_receiver())
except SenderClosedError:
pass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also in here, I would always document why we ignore an error with a debug log line. If feel if something goes wrong here we'll be too blind otherwise when trying to figure out what happened.

@llucax llucax added this to the v1.0.0-rc2206 milestone Apr 22, 2026
@simonvoelcker
Copy link
Copy Markdown
Contributor Author

@llucax thank you for raising these points. I agree that we're adding a lot of potentially problematic code here, and it will be even more in phase 2 (teardown of closed channels / propagating closed senders) which I have lying around on a branch locally.

I am okay with closing this PR. I am not married to it just because I wrote it, this was simply my second task overall and I did not oversee the consequences yet or have enough of an overview myself to propose a better alternative. @shsms what do you think?

llucax added a commit to llucax/frequenz-sdk-python that referenced this pull request Apr 27, 2026
Add targeted tests adapted to the v1.x.x branch that verify behavior for
issues identified in the pull request frequenz-floss#1371:

- Data sourcing: Verify duplicate subscription and unknown components
  with same channel name does not hang. There is also a test for invalid
  metrics, but this one is currently failing (it hangs), but since we
  plan to remove the data sourcing actor, we delay the fix for now.

- Resampling: Verify invalid metric request does not block later valid
  request.

- Power status subscriptions (battery, EV charger, PV pools): Verify
  same-instance subscriptions share the registry channel correctly.

Signed-off-by: Leandro Lucarella <luca-frequenz@llucax.com>
@llucax
Copy link
Copy Markdown
Contributor

llucax commented Apr 27, 2026

Sorry for the late reply, I somehow missed the notification.

@llucax thank you for raising these points. I agree that we're adding a lot of potentially problematic code here, and it will be even more in phase 2 (teardown of closed channels / propagating closed senders) which I have lying around on a branch locally.

OK, interesting, I was expecting auto-closing to actually simplify the code, so you are saying it makes it even more complex? Maybe it is worth creating a draft PR with it so we can have a better idea on how it makes things worse/better, at least if you have something that could be useful to share.

I am okay with closing this PR. I am not married to it just because I wrote it, this was simply my second task overall and I did not oversee the consequences yet or have enough of an overview myself to propose a better alternative.

Yeah, to be honest I also didn't saw this coming. I think removing the channel registry is still a good thing, it has it own issues, it is just I wouldn't remove it at all cost.

simonvoelcker pushed a commit to simonvoelcker/frequenz-sdk-python that referenced this pull request May 5, 2026
…1392)

Add targeted tests adapted to the v1.x.x branch that verify behavior for
issues identified in the pull request frequenz-floss#1371:

- Data sourcing: Verify duplicate subscription and unknown components
with same channel name does not hang. There is also a test for invalid
metrics, but this one is currently failing (it hangs), but since we plan
to remove the data sourcing actor, we delay the fix for now.

- Resampling: Verify invalid metric request does not block later valid
request.

- Power status subscriptions (battery, EV charger, PV pools): Verify
same-instance subscriptions share the registry channel correctly.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cmd:skip-release-notes It is not necessary to update release notes for this PR part:core Affects the SDK core components (data structures, etc.) part:data-pipeline Affects the data pipeline part:docs Affects the documentation part:microgrid Affects the interactions with the microgrid part:tests Affects the unit, integration and performance (benchmarks) tests part:tooling Affects the development tooling (CI, deployment, dependency management, etc.)

Projects

Status: To do

Development

Successfully merging this pull request may close these issues.

3 participants