Remove channel registry#1371
Conversation
f4e6125 to
9186ad8
Compare
a878b7a to
22cd7c5
Compare
|
Sorry to be a party pooper. I just started with the review and looking at the data sourcing actor and I think we might be overcomplicating things. Why can't we just reverse the ownership, and make the requester own the "data receiving channel"? So instead of creating a oneshot channel1, the requester creates a normal channel and gives the other end the sender. That's it! No back and forth sending and receiving receivers. The only disadvantage of this approach is we can't de-duplicate channels so to speak, so no broadcasting like we do now (the data sourcing actor creates one channel per metric-cid combination I think, and create new receivers for any new party asking for data). We can still de-duplicate by subscribing to the microgrid API once and send sending to multiple senders, but we'll still have a channel per sender. But for broadcast channels we actually use one queue per receiver, so I guess in terms of memory usage it shouldn't change much. So, this is a very early thought, I didn't think of all the other potential issues, but by looking at the changes it really feels to me that we are introducing non-trivial complexities with this change. For example the new requirement to being async to subscribe to data is gone with this approach. I think that's my main concern, we are converting something that is inherently sync (create a new receiver to receive data) into something async, and that's very disruptive. I will continue with my review, but I wanted to mention this early, as we are planning to build on top of this, I want to raise it as soon as possible. Footnotes
|
38fb0ab to
a28fab2
Compare
|
@llucax I completely agree that this change adds regrettable complexity, and we should explore ways of avoiding it. It is my understanding that channels must be de-duplicated somehow because we might be dealing with hundreds of components in practice. The channels must then be owned centrally, whether that's near the source or in a singleton registry. Maybe we can revisit the registry idea, but store senders there instead of channels? That would avoid the oneshot channels. Regarding sync-vs-async: The creation of channels was async before and stays async, no? We're sending ComponentMetricRequests either way, it's just that in one case we discussed today (GridFrequency.new_receiver) a receiver must be returned synchronously, so we have to use the forwarding channel. |
|
The problem with the singleton channel registry for me is when things go wrong it is very hard to find out when a channel was first created, who requested it, etc. This problem came up and was very hard to debug. This is why I created the option to log when a channel is created in the registry (including the stack trace), but that turns out to pollute the logs excessively when enabled, as you can't really filter it by channel name, so it is printed for all channels. This is why I think getting rid of the channel registry is a good goal, but I think we agree that not at all cost. About the de-duplication, as I mentioned, it might be worth taking a deeper look at how much overhead really means having one channel per metric-cid vs. the current approach of having one receiver per metric-cid. As for broadcast channels we have a queue per receiver, and the sender just iterates over all receiver and put the new message there, I'm not sure it will be a big difference between having 1 broadcast channel with N receivers vs having N "unicast channels" (for now I guess we'll use a broadcast channel too. Note We explored in the past implementing a unicast channel but I think it was as slow as the broadcast channel so we kept the broadcast channel only. I guess for unicast we can just use a asyncio queue, not sure if that was what we used to implement the unicast channel. |
llucax
left a comment
There was a problem hiding this comment.
I will send this partial review, as all commits were split while I was mid-review. Will wait until things settle down to continue the review.
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
16d734a to
ca39198
Compare
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
ca39198 to
3e65895
Compare
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
0508095 to
f6ca3ee
Compare
|
I guess we the channels release this is ready for a final review, let me know (request a review) when it is ready for another round. |
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
…ew, but only once Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
9fa75ab to
213f897
Compare
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
213f897 to
d6c8706
Compare
|
@llucax I think it's ready again. Tbh I also lost some context since the last time I worked on this and also kinda ran out of steam in the end. Would appreciate if we can move this over the line so I can continue with phase 2. |
|
Oh, needs to resolve a conflict in the benchmark now, but I will review anyway, it is a very isolated part. |
|
@llucax yeah, I saw those and resolved them already locally. I'll push this after your review then, so as not to interfere. It's isolated, as you say. |
llucax
left a comment
There was a problem hiding this comment.
I hate to revive old discussions, but the more we insist with this approach of using oneshot to replace the channel registry, the more I'm convinced this is not the right approach.
In this review round I found 3 serious and very obscure concurrency bugs. To be honest, I'm not sure if I would have caught these issues if I didn't used AI, as some issues can only be understood when looking at parts of the code that are not even in the diff (I asked for the AI review first, so we'll never know). I still spent about 3 hours trying to understand and validate the issues, so it is also not like I just asked AI and it was zero effort. That said it is also a possibility that I didn't get it right and some issue is not that serious, or extremely unlikely, but still...
This makes me very little confident that we even uncovered all potential issues with this approach, so I suggest stepping back again and thinking if we really want to move forward with this.
My feeling is we are planting a minefield here, and for what? Why are we removing the channel registry? If it is because of auto-closing, can't we just introduce auto-closing while keeping the channel registry? Maybe we can keep the channel class as non-deprecated and allow both usages of channels, one owning the whole channel and one driven by senders/receivers, or we can have some other mechanism to be able to spawn more senders/receivers for an existing channel as long as it is not closed.
I'm sure we can find a solution, but to me, adding this level of complexity is not acceptable, and will hunt us in the future even if we can get this PR right and bug free, we'll need to add more code that will have the same likelihood of getting obscure concurrency bugs.
In any case, I also asked the AI to add tests cases to expose these issues, I think it makes sense to add them, and here is a PR to add them to v1.x.x:
New test cases diff for this branch
[!NOTE]
This is an old iteration, I wanted to update it but I'm hitting some rate limiting :D
diff --git a/tests/actor/test_resampling.py b/tests/actor/test_resampling.py
index 6e8ee9389..f9e6dce73 100644
--- a/tests/actor/test_resampling.py
+++ b/tests/actor/test_resampling.py
@@ -266,3 +266,47 @@ def _now
)
await resampling_actor._resampler.stop() # pylint: disable=protected-access
+
+
+async def test_stalled_request_does_not_block_later_subscriptions() -> None:
+ """Ensure one stalled data-source handshake doesn't block later requests."""
+ data_source_req_chan = Broadcast[ComponentMetricRequest](name="data-source-req")
+ data_source_req_recv = data_source_req_chan.new_receiver()
+ resampling_req_chan = Broadcast[ComponentMetricRequest](name="resample-req")
+ resampling_req_sender = resampling_req_chan.new_sender()
+
+ async with ComponentMetricsResamplingActor(
+ data_sourcing_request_sender=data_source_req_chan.new_sender(),
+ resampling_request_receiver=resampling_req_chan.new_receiver(),
+ config=ResamplerConfig2(
+ resampling_period=timedelta(seconds=0.2),
+ max_data_age_in_periods=2,
+ ),
+ ):
+ first_sender, _ = OneshotChannel[BroadcastReceiver[Sample[Quantity]]]()
+ second_sender, _ = OneshotChannel[BroadcastReceiver[Sample[Quantity]]]()
+
+ first_request = ComponentMetricRequest(
+ namespace="Resampling-A",
+ component_id=ComponentId(9),
+ metric=Metric.BATTERY_SOC_PCT,
+ start_time=None,
+ telem_stream_sender=first_sender,
+ )
+ second_request = ComponentMetricRequest(
+ namespace="Resampling-B",
+ component_id=ComponentId(10),
+ metric=Metric.AC_ACTIVE_POWER,
+ start_time=None,
+ telem_stream_sender=second_sender,
+ )
+
+ await resampling_req_sender.send(first_request)
+ stalled_data_source_request = await data_source_req_recv.receive()
+ assert stalled_data_source_request.component_id == ComponentId(9)
+
+ await resampling_req_sender.send(second_request)
+ later_data_source_request = await asyncio.wait_for(
+ data_source_req_recv.receive(), timeout=0.1
+ )
+ assert later_data_source_request.component_id == ComponentId(10)
diff --git a/tests/microgrid/test_data_sourcing.py b/tests/microgrid/test_data_sourcing.py
index c18031bc9..577eee20c 100644
--- a/tests/microgrid/test_data_sourcing.py
+++ b/tests/microgrid/test_data_sourcing.py
@@ -11,7 +11,13 @@
import pytest
import pytest_mock
-from frequenz.channels import Broadcast, BroadcastReceiver, OneshotChannel
+from frequenz.channels import (
+ Broadcast,
+ BroadcastReceiver,
+ OneshotChannel,
+ Receiver,
+ ReceiverStoppedError,
+)
from frequenz.client.common.microgrid import MicrogridId
from frequenz.client.common.microgrid.components import ComponentId
from frequenz.client.microgrid.component import (
@@ -37,11 +43,30 @@
)
from frequenz.sdk.timeseries import Sample
+from ..utils.receive_timeout import Timeout
+
T = TypeVar("T", bound=ComponentData)
+U = TypeVar("U")
_MICROGRID_ID = MicrogridId(1)
+class Stopped:
+ """Sentinel for a closed receiver."""
+
+
+async def _receive_or_timeout(
+ receiver: Receiver[U], timeout: float = 0.1
+) -> U | type[Timeout] | type[Stopped]:
+ """Receive a message, or return a sentinel if the receiver stops or times out."""
+ try:
+ return await asyncio.wait_for(receiver.receive(), timeout=timeout)
+ except ReceiverStoppedError:
+ return Stopped
+ except asyncio.TimeoutError:
+ return Timeout
+
+
@pytest.fixture
def mock_connection_manager(mocker: pytest_mock.MockFixture) -> mock.Mock:
"""Fixture for getting a mock connection manager."""
@@ -124,6 +149,102 @@ def mock_connection_manager
assert expected_sample_value + i == sample.value.base_value
+async def test_duplicate_requests_reply_to_all_callers(
+ mock_connection_manager: mock.Mock, # pylint: disable=redefined-outer-name,unused-argument
+) -> None:
+ """Ensure duplicate direct subscriptions reply to every requester."""
+ req_chan = Broadcast[ComponentMetricRequest](name="data_sourcing_requests")
+ req_sender = req_chan.new_sender()
+
+ async with DataSourcingActor(req_chan.new_receiver()):
+ first_sender, first_receiver = OneshotChannel[
+ BroadcastReceiver[Sample[Quantity]]
+ ]()
+ second_sender, second_receiver = OneshotChannel[
+ BroadcastReceiver[Sample[Quantity]]
+ ]()
+
+ request = ComponentMetricRequest(
+ "test-namespace",
+ ComponentId(4),
+ Metric.AC_ACTIVE_POWER,
+ None,
+ first_sender,
+ )
+ duplicate_request = ComponentMetricRequest(
+ "test-namespace",
+ ComponentId(4),
+ Metric.AC_ACTIVE_POWER,
+ None,
+ second_sender,
+ )
+
+ await req_sender.send(request)
+ _ = await asyncio.wait_for(first_receiver.receive(), timeout=1.0)
+
+ await req_sender.send(duplicate_request)
+ _ = await asyncio.wait_for(second_receiver.receive(), timeout=1.0)
+
+
+async def test_unknown_component_request_does_not_wait_forever(
+ mock_connection_manager: mock.Mock, # pylint: disable=redefined-outer-name,unused-argument
+) -> None:
+ """Ensure invalid component requests fail fast instead of hanging callers."""
+ req_chan = Broadcast[ComponentMetricRequest](name="data_sourcing_requests")
+ req_sender = req_chan.new_sender()
+
+ async with DataSourcingActor(req_chan.new_receiver()):
+ telem_stream_sender, telem_stream_receiver = OneshotChannel[
+ BroadcastReceiver[Sample[Quantity]]
+ ]()
+ request = ComponentMetricRequest(
+ "test-namespace",
+ ComponentId(999),
+ Metric.AC_ACTIVE_POWER,
+ None,
+ telem_stream_sender,
+ )
+
+ await req_sender.send(request)
+ result = await _receive_or_timeout(telem_stream_receiver)
+ assert result is not Timeout
+
+
+async def test_invalid_metric_request_does_not_block_later_requests(
+ mock_connection_manager: mock.Mock, # pylint: disable=redefined-outer-name,unused-argument
+) -> None:
+ """Ensure a bad request doesn't stall later valid subscriptions."""
+ req_chan = Broadcast[ComponentMetricRequest](name="data_sourcing_requests")
+ req_sender = req_chan.new_sender()
+
+ async with DataSourcingActor(req_chan.new_receiver()):
+ invalid_sender, _ = OneshotChannel[BroadcastReceiver[Sample[Quantity]]]()
+ valid_sender, valid_receiver = OneshotChannel[
+ BroadcastReceiver[Sample[Quantity]]
+ ]()
+
+ invalid_request = ComponentMetricRequest(
+ "test-namespace-invalid",
+ ComponentId(4),
+ Metric.BATTERY_SOC_PCT,
+ None,
+ invalid_sender,
+ )
+ valid_request = ComponentMetricRequest(
+ "test-namespace-valid",
+ ComponentId(4),
+ Metric.AC_ACTIVE_POWER,
+ None,
+ valid_sender,
+ )
+
+ await req_sender.send(invalid_request)
+ await req_sender.send(valid_request)
+
+ result = await _receive_or_timeout(valid_receiver)
+ assert result is not Timeout
+
+
def _new_meter_data(
component_id: ComponentId, timestamp: datetime, value: float
) -> MeterData:
diff --git a/tests/timeseries/test_power_status_subscription_races.py b/tests/timeseries/test_power_status_subscription_races.py
new file mode 100644
index 000000000..a99ce5c02
--- /dev/null
+++ b/tests/timeseries/test_power_status_subscription_races.py
@@ -0,0 +1,140 @@
+# License: MIT
+# Copyright © 2026 Frequenz Energy-as-a-Service GmbH
+
+"""Regression tests for pool power_status subscription races."""
+
+import asyncio
+from types import SimpleNamespace
+
+from frequenz.channels import Broadcast, Receiver, Sender
+from frequenz.client.common.microgrid.components import ComponentId
+from frequenz.quantities import Power
+
+from frequenz.sdk import timeseries
+from frequenz.sdk.microgrid._power_managing import ReportRequest, _Report
+from frequenz.sdk.timeseries.battery_pool._battery_pool import BatteryPool
+from frequenz.sdk.timeseries.ev_charger_pool._ev_charger_pool import EVChargerPool
+from frequenz.sdk.timeseries.pv_pool._pv_pool import PVPool
+
+
+def _new_report(target_power_watts: float) -> _Report:
+ """Create a distinct report for subscription assertions."""
+ target_power = Power.from_watts(target_power_watts)
+ return _Report(
+ target_power=target_power,
+ _inclusion_bounds=timeseries.Bounds(target_power, target_power),
+ _exclusion_bounds=None,
+ )
+
+
+async def _assert_same_instance_subscriptions_are_independent(
+ pool: BatteryPool | EVChargerPool | PVPool,
+ requests_rx: Receiver[ReportRequest],
+) -> None:
+ """Ensure two subscriptions on the same pool instance remain independent."""
+ existing_tasks = {
+ task for task in asyncio.all_tasks() if task is not asyncio.current_task()
+ }
+
+ try:
+ first_status_rx = pool.power_status.new_receiver()
+ second_status_rx = pool.power_status.new_receiver()
+
+ await asyncio.sleep(0)
+
+ first_request = await asyncio.wait_for(requests_rx.receive(), timeout=1.0)
+ second_request = await asyncio.wait_for(requests_rx.receive(), timeout=1.0)
+
+ first_report_channel = Broadcast[_Report](name="first-report", resend_latest=True)
+ second_report_channel = Broadcast[_Report](
+ name="second-report", resend_latest=True
+ )
+
+ await first_request.report_stream_sender.send(first_report_channel.new_receiver())
+ await second_request.report_stream_sender.send(
+ second_report_channel.new_receiver()
+ )
+
+ await asyncio.sleep(0)
+
+ first_report = _new_report(100.0)
+ second_report = _new_report(200.0)
+
+ await first_report_channel.new_sender().send(first_report)
+ await second_report_channel.new_sender().send(second_report)
+
+ received_first_report = await asyncio.wait_for(
+ first_status_rx.receive(), timeout=0.1
+ )
+ received_second_report = await asyncio.wait_for(
+ second_status_rx.receive(), timeout=0.1
+ )
+
+ assert received_first_report.target_power == first_report.target_power
+ assert received_second_report.target_power == second_report.target_power
+ finally:
+ created_tasks = [
+ task
+ for task in asyncio.all_tasks()
+ if task is not asyncio.current_task() and task not in existing_tasks
+ ]
+ for task in created_tasks:
+ task.cancel()
+ await asyncio.gather(*created_tasks, return_exceptions=True)
+
+
+async def test_battery_pool_power_status_same_instance_subscriptions_are_independent() -> None:
+ """Ensure battery pool power_status subscriptions don't race each other."""
+ requests_channel = Broadcast[ReportRequest](name="battery-pool-requests")
+ pool = BatteryPool(
+ pool_ref_store=SimpleNamespace(
+ _batteries=frozenset({ComponentId(8), ComponentId(18)}),
+ _power_manager_bounds_subscription_sender=requests_channel.new_sender(),
+ _power_bounds_subs={},
+ ),
+ name="battery-pool",
+ priority=5,
+ )
+
+ await _assert_same_instance_subscriptions_are_independent(
+ pool,
+ requests_channel.new_receiver(),
+ )
+
+
+async def test_ev_charger_pool_power_status_same_instance_subscriptions_are_independent() -> None:
+ """Ensure EV charger pool power_status subscriptions don't race each other."""
+ requests_channel = Broadcast[ReportRequest](name="ev-pool-requests")
+ pool = EVChargerPool(
+ pool_ref_store=SimpleNamespace(
+ component_ids=frozenset({ComponentId(12), ComponentId(22)}),
+ power_manager_bounds_subs_sender=requests_channel.new_sender(),
+ power_bounds_subs={},
+ ),
+ name="ev-pool",
+ priority=5,
+ )
+
+ await _assert_same_instance_subscriptions_are_independent(
+ pool,
+ requests_channel.new_receiver(),
+ )
+
+
+async def test_pv_pool_power_status_same_instance_subscriptions_are_independent() -> None:
+ """Ensure PV pool power_status subscriptions don't race each other."""
+ requests_channel = Broadcast[ReportRequest](name="pv-pool-requests")
+ pool = PVPool(
+ pool_ref_store=SimpleNamespace(
+ component_ids=frozenset({ComponentId(28), ComponentId(38)}),
+ power_manager_bounds_subs_sender=requests_channel.new_sender(),
+ power_bounds_subs={},
+ ),
+ name="pv-pool",
+ priority=5,
+ )
+
+ await _assert_same_instance_subscriptions_are_independent(
+ pool,
+ requests_channel.new_receiver(),
+ )| for existing_request in self._req_streaming_metrics[comp_id][request.metric]: | ||
| if existing_request.get_channel_name() == request.get_channel_name(): | ||
| # the requested metric is already being handled, so nothing to do. | ||
| return | ||
|
|
||
| self._req_streaming_metrics[comp_id][request.metric].append(request) | ||
|
|
||
| await self._update_streams( | ||
| comp_id, | ||
| category, | ||
| ) |
There was a problem hiding this comment.
It seems we have a problem here. Found by AI but seems to make sense:
If the requested metric was already requested or the component ID is unknown (category is None), we just return, which means we don't call self._update_streams(), which means we don't call self._handle_data_stream(), which means we don't call self._get_metric_senders(), which means we don't call await request.telem_stream_sender.send(telem_stream.new_receiver()), which means we never reply to the requester.
I guess we can reply in here before the return, but this is getting very hard to follow, so not sure if we can find a better approach. Maybe do the replying completely outside all that call stack, and in this function, right after we either created the new channel or got the existing channel, but I guess this will imply awaiting on the task that is creating the new channel.
Full AI analysis including a proposed fix (long)
What goes wrong
A direct caller now creates a fresh oneshot channel for every subscription request and waits for a BroadcastReceiver to be handed back.
You can see that in:
GridFrequency.subscribe()atsrc/frequenz/sdk/timeseries/_grid_frequency.py:116-130GridFrequency.new_receiver()’s background path atsrc/frequenz/sdk/timeseries/_grid_frequency.py:107-114, 132-140ResampledStreamFetcher.fetch_stream()atsrc/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py:52-65
Each of those callers expects this sequence:
- create
OneshotChannel - send
ComponentMetricRequest(..., telem_stream_sender=...) - wait on
telem_stream_receiver.receive()
The request object itself makes the dedupe key explicit:
ComponentMetricRequest.get_channel_name()is based onnamespace,component_id,metric, and optionalstart_time- see
src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py:59-73
So two requests for the same logical stream have the same get_channel_name(), but different oneshot senders.
The exact failure path
In MicrogridApiSource.add_metric():
for existing_request in self._req_streaming_metrics[comp_id][request.metric]:
if existing_request.get_channel_name() == request.get_channel_name():
# the requested metric is already being handled, so nothing to do.
returnThat is at:
src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py:531-534
The bug is that “nothing to do” is false now.
Why? Because the only place that replies to a caller’s oneshot sender is later, in _get_metric_senders():
src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py:412-423
Specifically:
await request.telem_stream_sender.send(telem_stream.new_receiver())But _get_metric_senders() only iterates over the requests that are still present in req_list.
So if request B is deduped against request A:
- A stays in
req_list - B is dropped early
- only A gets its oneshot reply
- B waits forever on
receive()
Why this hangs forever
The second caller is waiting on a oneshot receiver that will never get a value.
There is no timeout in:
GridFrequency.subscribe()(_grid_frequency.py:128-129)GridFrequency._send_request()(_grid_frequency.py:138-140)ResampledStreamFetcher.fetch_stream()(_resampled_stream_fetcher.py:64-65)
So if no one ever sends on that oneshot sender, the waiter just sits there.
Why subscribe() + deprecated new_receiver() is broken too
GridFrequency has two request paths now:
New path
subscribe() creates a fresh oneshot sender every time:
_grid_frequency.py:118-126
Deprecated path
new_receiver() uses a pre-created oneshot receiver stored on the instance and starts _send_request() once:
_grid_frequency.py:107-114_grid_frequency.py:132-140
Both paths generate requests with the same logical stream key:
- namespace =
"grid-frequency" - same component id
- same metric =
AC_FREQUENCY
So they collide in add_metric().
That means:
- if
new_receiver()gets there first, thensubscribe()can be deduped and hang forever - if
subscribe()gets there first, then the background_send_request()started bynew_receiver()can hang forever waiting onself._telem_stream_receiver.receive()
So the returned receiver from new_receiver() may exist, but it never gets wired to the underlying telemetry stream.
Practical impact
This is high severity because it is:
- deterministic once the duplicate order happens
- silent — no exception, just a permanent wait
- user-facing — it affects public subscription APIs
- sticky — the waiting task doesn’t recover on its own
The fix
The right non-breaking fix is:
dedupe the underlying stream, but still reply to every caller
In other words:
- keep only one upstream telemetry channel per
get_channel_name() - but hand out a fresh
BroadcastReceiverto each caller’s oneshot sender
Important subtlety
A naïve fix would be “just append the duplicate request too”.
That does not work, because _get_metric_senders() would then create multiple senders for the same broadcast channel and send duplicate samples into the same stream.
So the fix has to separate:
- canonical stream registration
- pending caller replies
Suggested fix shape
Introduce a pending-replies map keyed by channel_name, for example:
self._pending_replies: dict[
str, list[OneshotSender[BroadcastReceiver[Sample[Quantity]]]]
] = {}In add_metric()
- always register the caller’s oneshot sender in
_pending_replies[channel_name] - if this is a duplicate and the channel already exists, reply immediately
- if this is a duplicate and the channel does not exist yet, return and let the eventual stream setup flush all pending replies
- only the first request should be appended to
_req_streaming_metrics
Sketch:
async def add_metric(self, request: ComponentMetricRequest) -> None:
comp_id = request.component_id
category = await self._get_component_category(comp_id)
if category is None:
_logger.error("Unknown component ID: %d in request %s", comp_id, request)
return
channel_name = request.get_channel_name()
self._pending_replies.setdefault(channel_name, []).append(
request.telem_stream_sender
)
self._req_streaming_metrics.setdefault(comp_id, {}).setdefault(request.metric, [])
for existing_request in self._req_streaming_metrics[comp_id][request.metric]:
if existing_request.get_channel_name() == channel_name:
if channel_name in self._channels:
await self._flush_pending_replies(channel_name)
return
self._req_streaming_metrics[comp_id][request.metric].append(request)
await self._update_streams(comp_id, category)Helper to answer everyone waiting for that logical stream
async def _flush_pending_replies(self, channel_name: str) -> BroadcastSender[Sample[Quantity]]:
if channel_name not in self._channels:
self._channels[channel_name] = Broadcast(name=channel_name)
telem_stream = self._channels[channel_name]
for sender in self._pending_replies.pop(channel_name, []):
try:
await sender.send(telem_stream.new_receiver())
except SenderClosedError:
pass
return telem_stream.new_sender()In _get_metric_senders()
Deduplicate by channel_name, reply to all pending callers for that channel, and create only one broadcast sender per logical stream:
async def _get_metric_senders(...):
all_senders = []
for metric, req_list in requests.items():
extraction_method = self._get_data_extraction_method(category, metric)
senders = []
seen_channel_names: set[str] = set()
for request in req_list:
channel_name = request.get_channel_name()
if channel_name in seen_channel_names:
continue
seen_channel_names.add(channel_name)
senders.append(await self._flush_pending_replies(channel_name))
all_senders.append((extraction_method, senders))
return all_sendersWhy this fix is good
It preserves all the good parts of the current design:
- still one underlying stream per logical metric request
- still one broadcast per logical stream
- still one fresh receiver per caller
- still non-breaking API
And it fixes both cases:
- duplicate direct subscriptions
subscribe()mixed with deprecatednew_receiver()
One important note
This fix addresses the duplicate request bug.
It does not fix the separate invalid-request bug where data sourcing can return/raise without replying to the oneshot at all. That still needs its own explicit error/close path.
| try: | ||
| await request.telem_stream_sender.send(telem_stream.new_receiver()) | ||
| except SenderClosedError: | ||
| pass |
There was a problem hiding this comment.
I wonder how this could happen and not be an issue, someone would have to create the oneshot channel, send the request and close the receiver/sender before receiving. I guess that could be a valid way to cancel a request. In any case, I think I would add at least a DEBUG log here saying why we are ignoring this exception (something like "assuming the request was cancelled by the requester").
| @property | ||
| def power_status(self) -> ReceiverFetcher[BatteryPoolReport]: | ||
| """Get a receiver to receive new power status reports when they change. | ||
|
|
||
| These include | ||
| - the current inclusion/exclusion bounds available for the pool's priority, | ||
| - the current target power for the pool's set of batteries, | ||
| - the result of the last distribution request for the pool's set of batteries. | ||
|
|
||
| Returns: | ||
| A receiver that will stream power status reports for the pool's priority. | ||
| """ | ||
| sub = _power_managing.ReportRequest( | ||
| report_stream_sender, self._report_stream_receiver = OneshotChannel[ | ||
| BroadcastReceiver[_Report] | ||
| ]() | ||
| request = _power_managing.ReportRequest( | ||
| source_id=self._source_id, | ||
| priority=self._priority, | ||
| component_ids=self._pool_ref_store._batteries, | ||
| report_stream_sender=report_stream_sender, | ||
| ) | ||
|
|
||
| forwarding_channel = Broadcast[_Report]( | ||
| name=request.get_channel_name() + ":Forwarded", | ||
| resend_latest=True, | ||
| ) | ||
| self._pool_ref_store._power_bounds_subs[sub.get_channel_name()] = ( | ||
|
|
||
| self._pool_ref_store._power_bounds_subs[request.get_channel_name()] = ( | ||
| asyncio.create_task( | ||
| self._pool_ref_store._power_manager_bounds_subscription_sender.send(sub) | ||
| self._send_request(forwarding_channel.new_sender(), request) | ||
| ) | ||
| ) | ||
| channel = self._pool_ref_store._channel_registry.get_or_create( | ||
| _power_managing._Report, sub.get_channel_name() | ||
|
|
||
| return forwarding_channel | ||
|
|
||
| async def _send_request( |
There was a problem hiding this comment.
It seems like we have a rece condition here. Also AI assisted finding but looks legit to me.
If power_status is called twice very fast, self._report_stream_receiver is overwritten, losing the reference to the first oneshot channel receiver, and the same could happen for _pipe(), but for this to happen the first call should have had time to actually give execution time to the _send_request() task so self._pipe is actually assigned, and while await self._pipe.start() is waiting, a new task could override it too. I guess this is much more unlikely, but yet still possible.
And there is also a potential race for self._pool_ref_store._power_bounds_subs[sub.get_channel_name()] too, which can lead to losing the reference to the task object. But I think this issue predates this PR, as we already had the task and self._pool_ref_store._power_bounds_subs[sub.get_channel_name()]. And I think this demonstrate why I didn't like very much the approach to create tasks to make async stuff look like async, but we have many of these before, so I guess it will a big undertaking of its own to get rid of this and make our async code more sound.
As for the fix, we should probably make some stuff task-local by passing it as an argument instead of using the instance shared state.
Since it took me a bit to understand the whole dimension of this, in case it is useful, this is the whole AI analysis (which includes details on a possible fix):
AI analysis and fix proposal (long)
The Race Condition in power_status — Detailed Explanation
The Setup
Each pool class (BatteryPool, EVChargerPool, PVPool) stores two pieces of per-request handshake state as instance attributes:
self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None
self._pipe: Pipe[_Report] | None = NoneWhen power_status is accessed, it:
- Creates a
OneshotChannelpair (sender + receiver) - Stores the receiver on
self._report_stream_receiver(overwrites any previous value) - Creates a
ReportRequestcarrying the sender - Spawns an async task (
_send_request) that sends the request to the power manager, then reads fromself._report_stream_receiverto get the reply
The Race
get_channel_name() returns f"power_manager.report.{self.component_ids=}.{self.priority=}" — it's deterministic for a given pool instance. Two calls on the same instance produce the same key. Here's the interleaving:
Call A: Call B:
────── ──────
sender_A, recv_A = OneshotChannel()
self._report_stream_receiver = recv_A ←── overwritten below
sender_B, recv_B = OneshotChannel()
self._report_stream_receiver = recv_B
task_A = create_task(_send_request(...))
power_bounds_subs[key] = task_B ←── overwrites task_A ref
task_B = create_task(_send_request(...))
─── async tasks resume ───
task_A: sends request_A via power manager
await self._report_stream_receiver.receive()
⬆ reads recv_B (not recv_A!) — self was overwritten
task_B: sends request_B via power manager
await self._report_stream_receiver.receive()
⬆ also reads recv_B — contends with task_A
Three bugs compound:
| What gets overwritten | Consequence |
|---|---|
self._report_stream_receiver |
Task A reads from the wrong oneshot (recv_B). The actor correctly sends reply_A through sender_A → recv_A, but nobody reads recv_A. Task A either gets reply_B (cross-wired) or hangs forever waiting on recv_B if task B consumes it first. |
self._pipe |
When task B creates its Pipe and stores it in self._pipe, task A's Pipe loses its only reference and becomes eligible for GC. The comment on line 87 even says "Keep a reference to prevent garbage collector from destroying pipe" — the race defeats this exact protection. |
power_bounds_subs[key] |
The dict uses get_channel_name() as key, which is identical for both calls. Task A's handle is silently replaced. The task still runs (the event loop holds it), but it can no longer be cancelled via stop(). |
Why This Hasn't Blown Up (Yet)
In practice, power_status is typically accessed once per pool instance during setup. The race requires two calls before the first task's await yields — a narrow window. But it's a latent correctness bug that becomes a hard-to-diagnose hang under any workload that re-subscribes.
The Fix
The fix follows the pattern already used elsewhere in this codebase (_resampling.py:63–110, _grid_frequency.py:116–130): make the oneshot receiver task-local by passing it as a parameter, and collect pipes in a list instead of a single slot.
Here are the exact changes for all three files:
1. _battery_pool.py
__init__ — replace the two instance attrs with a pipe list:
# BEFORE (lines 86–88):
self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None
# Keep a reference to prevent garbage collector from destroying pipe
self._pipe: Pipe[_Report] | None = None
# AFTER:
# Keep references to prevent garbage collector from destroying pipes
self._pipes: list[Pipe[_Report]] = []power_status — make the receiver local, pass it to the task:
# BEFORE (lines 344–365):
report_stream_sender, self._report_stream_receiver = OneshotChannel[
BroadcastReceiver[_Report]
]()
request = _power_managing.ReportRequest(
source_id=self._source_id,
priority=self._priority,
component_ids=self._pool_ref_store._batteries,
report_stream_sender=report_stream_sender,
)
forwarding_channel = Broadcast[_Report](
name=request.get_channel_name() + ":Forwarded",
resend_latest=True,
)
self._pool_ref_store._power_bounds_subs[request.get_channel_name()] = (
asyncio.create_task(
self._send_request(forwarding_channel.new_sender(), request)
)
)
return forwarding_channel
# AFTER:
report_stream_sender, report_stream_receiver = OneshotChannel[
BroadcastReceiver[_Report]
]()
request = _power_managing.ReportRequest(
source_id=self._source_id,
priority=self._priority,
component_ids=self._pool_ref_store._batteries,
report_stream_sender=report_stream_sender,
)
forwarding_channel = Broadcast[_Report](
name=request.get_channel_name() + ":Forwarded",
resend_latest=True,
)
self._pool_ref_store._power_bounds_subs[request.get_channel_name()] = (
asyncio.create_task(
self._send_request(
forwarding_channel.new_sender(),
request,
report_stream_receiver,
)
)
)
return forwarding_channel_send_request — accept receiver as parameter, append pipe to list:
# BEFORE (lines 367–385):
async def _send_request(
self,
forwarding_sender: Sender[_Report],
request: ReportRequest,
) -> None:
"""Send the report request and receive the report channel.
Connect it via pipe to the channel that was returned in power_status.
"""
await self._pool_ref_store._power_manager_bounds_subscription_sender.send(
request
)
assert self._report_stream_receiver is not None
report_receiver: Receiver[_Report] = (
await self._report_stream_receiver.receive()
)
self._pipe = Pipe(report_receiver, forwarding_sender)
await self._pipe.start()
# AFTER:
async def _send_request(
self,
forwarding_sender: Sender[_Report],
request: ReportRequest,
report_stream_receiver: Receiver[Receiver[_Report]],
) -> None:
"""Send the report request and receive the report channel.
Connect it via pipe to the channel that was returned in power_status.
"""
await self._pool_ref_store._power_manager_bounds_subscription_sender.send(
request
)
report_receiver: Receiver[_Report] = await report_stream_receiver.receive()
pipe = Pipe(report_receiver, forwarding_sender)
self._pipes.append(pipe)
await pipe.start()2. _ev_charger_pool.py
The same three-part change, adjusted for this file's naming conventions:
__init__ (lines 73–75):
# BEFORE:
self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None
# Keep a reference to prevent garbage collector from destroying pipe
self._pipe: Pipe[_Report] | None = None
# AFTER:
# Keep references to prevent garbage collector from destroying pipes
self._pipes: list[Pipe[_Report]] = []power_status (lines 186–207):
# BEFORE:
report_stream_sender, self._report_stream_receiver = OneshotChannel[
BroadcastReceiver[_Report]
]()
request = _power_managing.ReportRequest(
source_id=self._source_id,
priority=self._priority,
component_ids=self._pool_ref_store.component_ids,
report_stream_sender=report_stream_sender,
)
forwarding_channel = Broadcast[_Report](
name=request.get_channel_name() + ":Forwarded",
resend_latest=True,
)
self._pool_ref_store.power_bounds_subs[request.get_channel_name()] = (
asyncio.create_task(
self._send_request(forwarding_channel.new_sender(), request)
)
)
return forwarding_channel
# AFTER:
report_stream_sender, report_stream_receiver = OneshotChannel[
BroadcastReceiver[_Report]
]()
request = _power_managing.ReportRequest(
source_id=self._source_id,
priority=self._priority,
component_ids=self._pool_ref_store.component_ids,
report_stream_sender=report_stream_sender,
)
forwarding_channel = Broadcast[_Report](
name=request.get_channel_name() + ":Forwarded",
resend_latest=True,
)
self._pool_ref_store.power_bounds_subs[request.get_channel_name()] = (
asyncio.create_task(
self._send_request(
forwarding_channel.new_sender(),
request,
report_stream_receiver,
)
)
)
return forwarding_channel_send_request (lines 209–225):
# BEFORE:
async def _send_request(
self,
forwarding_sender: Sender[_Report],
request: ReportRequest,
) -> None:
"""Send the report request and receive the report channel.
Connect it via pipe to the channel that was returned in power_status.
"""
await self._pool_ref_store.power_manager_bounds_subs_sender.send(request)
assert self._report_stream_receiver is not None
report_receiver: Receiver[_Report] = (
await self._report_stream_receiver.receive()
)
self._pipe = Pipe(report_receiver, forwarding_sender)
await self._pipe.start()
# AFTER:
async def _send_request(
self,
forwarding_sender: Sender[_Report],
request: ReportRequest,
report_stream_receiver: Receiver[Receiver[_Report]],
) -> None:
"""Send the report request and receive the report channel.
Connect it via pipe to the channel that was returned in power_status.
"""
await self._pool_ref_store.power_manager_bounds_subs_sender.send(request)
report_receiver: Receiver[_Report] = await report_stream_receiver.receive()
pipe = Pipe(report_receiver, forwarding_sender)
self._pipes.append(pipe)
await pipe.start()3. _pv_pool.py
Identical structure to EV charger:
__init__ (lines 68–70):
# BEFORE:
self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None
# Keep a reference to prevent garbage collector from destroying pipe
self._pipe: Pipe[_Report] | None = None
# AFTER:
# Keep references to prevent garbage collector from destroying pipes
self._pipes: list[Pipe[_Report]] = []power_status (lines 158–179):
# BEFORE:
report_stream_sender, self._report_stream_receiver = OneshotChannel[
BroadcastReceiver[_Report]
]()
...
self._pool_ref_store.power_bounds_subs[request.get_channel_name()] = (
asyncio.create_task(
self._send_request(forwarding_channel.new_sender(), request)
)
)
# AFTER:
report_stream_sender, report_stream_receiver = OneshotChannel[
BroadcastReceiver[_Report]
]()
...
self._pool_ref_store.power_bounds_subs[request.get_channel_name()] = (
asyncio.create_task(
self._send_request(
forwarding_channel.new_sender(),
request,
report_stream_receiver,
)
)
)_send_request (lines 181–197):
# BEFORE:
async def _send_request(
self,
forwarding_sender: Sender[_Report],
request: ReportRequest,
) -> None:
...
await self._pool_ref_store.power_manager_bounds_subs_sender.send(request)
assert self._report_stream_receiver is not None
report_receiver: Receiver[_Report] = (
await self._report_stream_receiver.receive()
)
self._pipe = Pipe(report_receiver, forwarding_sender)
await self._pipe.start()
# AFTER:
async def _send_request(
self,
forwarding_sender: Sender[_Report],
request: ReportRequest,
report_stream_receiver: Receiver[Receiver[_Report]],
) -> None:
...
await self._pool_ref_store.power_manager_bounds_subs_sender.send(request)
report_receiver: Receiver[_Report] = await report_stream_receiver.receive()
pipe = Pipe(report_receiver, forwarding_sender)
self._pipes.append(pipe)
await pipe.start()Optional Further Hardening: Unique Task Keys
The power_bounds_subs dict uses get_channel_name() as key, which is the same for repeated calls on the same instance. This silently drops the old task reference. While the task still runs (the event loop keeps it alive), stop() can no longer cancel it. A straightforward fix would be to append a uuid to the key:
task_key = f"{request.get_channel_name()}:{uuid.uuid4()}"
self._pool_ref_store._power_bounds_subs[task_key] = asyncio.create_task(...)This is lower severity since it only affects cleanup, but it prevents orphaned tasks from surviving stop().
Summary
| Root cause | Shared mutable state (self._report_stream_receiver, self._pipe) written by sync code, read by async tasks |
|---|---|
| Fix principle | Make per-request state task-local: pass the oneshot receiver as a function parameter, collect pipes in a list |
| Files changed | 3 files, ~15 lines each, all mechanical |
| Risk | Very low — the change is strictly narrower in scope (removes shared state), follows existing codebase patterns, and the external contract (ReceiverFetcher return type) is unchanged |
| Precedent in codebase | _resampling.py:63–110, _grid_frequency.py:116–130, _voltage_streamer.py:139–166 all use this local-oneshot pattern correctly |
Note
This applies to the other pools too.
| telem_stream_sender=sender, | ||
| ) | ||
| await self._data_sourcing_request_sender.send(data_source_request) | ||
| return await receiver.receive() |
There was a problem hiding this comment.
It seems this is introducing a new subtle concurrency bug (also found with AI).
ComponentMetricsResamplingActor processes subscription requests strictly serially in _process_resampling_requests() via async for request ...: await self._subscribe(request) (src/frequenz/sdk/microgrid/_resampling.py:112-115).
Inside that, _subscribe() calls _subscribe_to_data_source() (this method), which sends a data-sourcing request and then waits synchronously for a oneshot reply with return await receiver.receive(). From the channels source, that oneshot wait has no built-in timeout: it wakes only if someone calls send() or aclose(). If neither happens, it can wait forever.
Before the PR a slow upstream setup might delay data arrival, but not subscription registration in the resampling actor. The resampling actor could still keep draining its incoming request queue because it only needed the channel name and local registry wiring.
After the PR a slow upstream setup delays subscription registration itself. Now the resampling actor cannot process later requests until the current one gets its oneshot reply.
Full AI analysis and fix proposal (long)
-
What actually stalls
- Resampling request A arrives.
_process_resampling_requests()starts handling A and blocks in_subscribe_to_data_source()waiting for the data-sourcing reply.- Requests B, C, D are still accepted into the resampling actor’s request channel, but they are not processed because the loop never gets past A.
- The queue is bounded at
500(src/frequenz/sdk/microgrid/_data_pipeline.py:54-59,460-468), so after enough blocked requests, senders themselves start back-pressuring.
-
Why the reply can disappear
- The happy path is
MicrogridApiSource._get_metric_senders()sendingtelem_stream.new_receiver()back throughrequest.telem_stream_sender(src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py:415-420). - But there are bad paths where no reply is sent at all. One concrete one is the unknown-component case in
add_metric(), which logs and returns without replying (microgrid_api_source.py:521-525). - So the resampling actor can end up waiting forever on a reply that will never come.
- The happy path is
-
Implications / risk
- This is broader than “one request fails”: one bad subscription can delay unrelated later subscriptions behind it.
- It does not necessarily crash the actor, which makes it worse operationally: a stuck
await receiver.receive()just hangs; it doesn’t trigger restart/recovery. - In practice this means callers waiting for new resampled streams can hang, and under load the request queue can fill and propagate backpressure outward.
Recommended fix
I’d treat this as a two-layer fix:
-
Fix the root cause in data sourcing
Data sourcing should never silently drop a request without replying. On invalid requests, it should explicitly fail the oneshot handshake instead of just returning. -
Make resampling robust against missing replies anyway
Even if data sourcing is fixed, resampling should not let one bad handshake stall all later requests.
The safest non-breaking design is:
A. Stop processing subscriptions inline
Instead of this:
async for request in self._resampling_request_receiver:
await self._subscribe(request)process each request in its own task and supervise those tasks.
B. Coalesce duplicate in-flight subscriptions
Because concurrent tasks can race on the same request_channel_name, keep a pending_subscriptions map so repeated requests share the same initialization task/future instead of creating duplicate source subscriptions.
C. Bound the oneshot wait
Wrap the data-sourcing reply wait in a timeout, e.g. asyncio.timeout(...), so a lost reply becomes a logged failure instead of a permanent hang.
Suggested shape
Something like this internally:
self._pending_subscriptions: dict[str, asyncio.Task[Broadcast[Sample[Quantity]]]] = {}
self._subscription_tasks: set[asyncio.Task[None]] = set()Then:
async def _subscribe_to_data_source(
self, request: ComponentMetricRequest
) -> BroadcastReceiver[Sample[Quantity]]:
sender, receiver = OneshotChannel[BroadcastReceiver[Sample[Quantity]]]()
data_source_request = ComponentMetricRequest(
namespace=request.namespace + ":Source",
component_id=request.component_id,
metric=request.metric,
start_time=request.start_time,
telem_stream_sender=sender,
)
await self._data_sourcing_request_sender.send(data_source_request)
async with asyncio.timeout(5.0):
return await receiver.receive()And coalesced initialization:
async def _ensure_subscription(
self, request: ComponentMetricRequest
) -> Broadcast[Sample[Quantity]]:
request_channel_name = request.get_channel_name()
if channel := self._data_sink_channels.get(request_channel_name):
return channel
if pending := self._pending_subscriptions.get(request_channel_name):
return await pending
async def init() -> Broadcast[Sample[Quantity]]:
data_source = await self._subscribe_to_data_source(request)
data_sink_channel = Broadcast(name=request_channel_name, resend_latest=True)
self._data_sink_channels[request_channel_name] = data_sink_channel
self._resampler.add_timeseries(
name=request_channel_name,
source=data_source,
sink=data_sink_channel.new_sender().send,
)
return data_sink_channel
task = asyncio.create_task(init(), name=f"init:{request_channel_name}")
self._pending_subscriptions[request_channel_name] = task
try:
return await task
finally:
self._pending_subscriptions.pop(request_channel_name, None)Then the request loop can stay responsive:
async def _handle_subscription_request(self, request: ComponentMetricRequest) -> None:
channel = await self._ensure_subscription(request)
await request.telem_stream_sender.send(channel.new_receiver())
async def _process_resampling_requests(self) -> None:
async for request in self._resampling_request_receiver:
task = asyncio.create_task(
self._handle_subscription_request(request),
name=f"subscribe:{request.get_channel_name()}",
)
self._subscription_tasks.add(task)
task.add_done_callback(self._subscription_tasks.discard)Why this is the best fix
- It removes the head-of-line blocking.
- It preserves the current external API.
- It prevents duplicate initialization races for the same request.
- It turns “wait forever” into a bounded failure.
- It still works even if data sourcing regresses again later.
If you want, I can also turn this into a minimal patch against _resampling.py and sketch the matching data-sourcing change.
| try: | ||
| await sub.report_stream_sender.send(channel.new_receiver()) | ||
| except SenderClosedError: | ||
| pass |
There was a problem hiding this comment.
Also in here, I would always document why we ignore an error with a debug log line. If feel if something goes wrong here we'll be too blind otherwise when trying to figure out what happened.
|
@llucax thank you for raising these points. I agree that we're adding a lot of potentially problematic code here, and it will be even more in phase 2 (teardown of closed channels / propagating closed senders) which I have lying around on a branch locally. I am okay with closing this PR. I am not married to it just because I wrote it, this was simply my second task overall and I did not oversee the consequences yet or have enough of an overview myself to propose a better alternative. @shsms what do you think? |
Add targeted tests adapted to the v1.x.x branch that verify behavior for issues identified in the pull request frequenz-floss#1371: - Data sourcing: Verify duplicate subscription and unknown components with same channel name does not hang. There is also a test for invalid metrics, but this one is currently failing (it hangs), but since we plan to remove the data sourcing actor, we delay the fix for now. - Resampling: Verify invalid metric request does not block later valid request. - Power status subscriptions (battery, EV charger, PV pools): Verify same-instance subscriptions share the registry channel correctly. Signed-off-by: Leandro Lucarella <luca-frequenz@llucax.com>
|
Sorry for the late reply, I somehow missed the notification.
OK, interesting, I was expecting auto-closing to actually simplify the code, so you are saying it makes it even more complex? Maybe it is worth creating a draft PR with it so we can have a better idea on how it makes things worse/better, at least if you have something that could be useful to share.
Yeah, to be honest I also didn't saw this coming. I think removing the channel registry is still a good thing, it has it own issues, it is just I wouldn't remove it at all cost. |
…1392) Add targeted tests adapted to the v1.x.x branch that verify behavior for issues identified in the pull request frequenz-floss#1371: - Data sourcing: Verify duplicate subscription and unknown components with same channel name does not hang. There is also a test for invalid metrics, but this one is currently failing (it hangs), but since we plan to remove the data sourcing actor, we delay the fix for now. - Resampling: Verify invalid metric request does not block later valid request. - Power status subscriptions (battery, EV charger, PV pools): Verify same-instance subscriptions share the registry channel correctly.
This removes the
ChannelRegistryclass.Background
This is part of an initiative to close unused channels to save resources. The
ChannelRegistrywas a de-facto singleton class for storing channels by name. Request objects such asComponentMetricRequestorReportRequesthave aget_channel_name()method, and the returned channel name acted as a contract between sender and receiver - With the name, they could obtain the same channel from theChannelRegistry. By getting rid of the registry, we are moving towards a target state where only references to senders and receivers, but no channels are being held.Implementation
ComponentMetricRequest
ComponentMetricRequestobjects represent a request to receive metrics from a component - Sometimes also referred to as telemetry stream. To serve such a request without relying on theChannelRegistry, the request got a new attribute:telem_stream_sender: Sender[Receiver[Sample[Quantity]]]. This is the sender of a one-shot channel that is used to send the receiver of the telemetry stream back to the original requester - Similar to a callback function. The receiver of a component metric request should therefore create (and own) the channel that sends the metric data, and use the one-shot channel to send a receiver of that channel "back".In a few places, component metric requests were created and sent from synchronous functions which returned the receiver. To make this work with the round-trip outlined above, an additional channel was added that merely forwards the metric data via a
Pipe. This allows us to immediately return a sender while the channel that serves the metric data is being constructed asynchronously.ReportRequest
ReportRequestobjects are a different kind of request that was handled similarly.Channel ownership
The receivers of request objects create the channels and own them. For instance, in case of
ComponentMetricRequests, MicrogridApiSource now maintains a dictionary (channel_lookup: dict[str, Broadcast]) as a replacement for the channel registry. It is necessary to have this lookup dictionary to avoid creating duplicate channels for similar metric requests. In this case, a new receiver is created from an existing channel found in the lookup dictionary.Forwarding channels are owned by their creator (e.g.
GridFrequency). These (and the Pipes that connect them to the other channels) are currently never closed. Closing these when all receivers or senders are closed is a TODO for a later iteration.