Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions examples/battery_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ async def main() -> None:
receivers = [
battery_pool.soc.new_receiver(limit=1),
battery_pool.capacity.new_receiver(limit=1),
# pylint: disable-next=protected-access
battery_pool._system_power_bounds.new_receiver(limit=1),
battery_pool.system_power_bounds.new_receiver(limit=1),
]

async for metric in merge(*receivers):
Expand Down
2 changes: 1 addition & 1 deletion src/frequenz/sdk/microgrid/_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ def new_battery_pool(
self._battery_power_wrapper.distribution_results_fetcher()
),
min_update_interval=self._resampler_config.resampling_period,
batteries_id=component_ids,
component_ids=component_ids,
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,17 @@ def _add_system_bounds_tracker(self, component_ids: frozenset[ComponentId]) -> N
battery_pool = _data_pipeline.new_battery_pool(
priority=-sys.maxsize - 1, component_ids=component_ids
)
# pylint: disable-next=protected-access
bounds_receiver = battery_pool._system_power_bounds.new_receiver()
bounds_receiver = battery_pool.system_power_bounds.new_receiver()
elif issubclass(self._component_class, EvCharger):
ev_charger_pool = _data_pipeline.new_ev_charger_pool(
priority=-sys.maxsize - 1, component_ids=component_ids
)
# pylint: disable-next=protected-access
bounds_receiver = ev_charger_pool._system_power_bounds.new_receiver()
bounds_receiver = ev_charger_pool.system_power_bounds.new_receiver()
elif issubclass(self._component_class, SolarInverter):
pv_pool = _data_pipeline.new_pv_pool(
priority=-sys.maxsize - 1, component_ids=component_ids
)
# pylint: disable-next=protected-access
bounds_receiver = pv_pool._system_power_bounds.new_receiver()
bounds_receiver = pv_pool.system_power_bounds.new_receiver()
else:
_logger.error(
"PowerManagingActor: Unsupported component class: %s",
Expand Down
169 changes: 29 additions & 140 deletions src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,17 @@
"""

import asyncio
import uuid
from collections import abc

from frequenz.client.common.microgrid.components import ComponentId
from frequenz.quantities import Energy, Percentage, Power, Temperature
from typing_extensions import override

from ... import timeseries
from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher
from ...microgrid import _power_distributing, _power_managing, connection_manager
from ..._internal._channels import ReceiverFetcher
from ...microgrid import _power_managing, connection_manager
from ...timeseries import Sample
from .._base_types import SystemBounds
from ..formulas._formula import Formula
from ..component_pool import ComponentPool
from ..formulas import Formula
from ._battery_pool_reference_store import BatteryPoolReferenceStore
from ._methods import SendOnUpdate
from ._metric_calculator import (
Expand All @@ -34,7 +33,7 @@
# pylint: disable=protected-access


class BatteryPool:
class BatteryPool(ComponentPool[BatteryPoolReferenceStore, BatteryPoolReport]):
"""An interface for interaction with pools of batteries.

Provides:
Expand All @@ -50,65 +49,6 @@ class BatteryPool:
[propose_discharge][frequenz.sdk.timeseries.battery_pool.BatteryPool.propose_discharge].
"""

def __init__(
self,
*,
pool_ref_store: BatteryPoolReferenceStore,
name: str | None,
priority: int,
):
"""Create a BatteryPool instance.

!!! note
`BatteryPool` instances are not meant to be created directly by users. Use
the [`microgrid.new_battery_pool`][frequenz.sdk.microgrid.new_battery_pool]
method for creating `BatteryPool` instances.

Args:
pool_ref_store: The battery pool reference store instance.
name: An optional name used to identify this instance of the pool or a
corresponding actor in the logs.
priority: The priority of the actor using this wrapper.
"""
self._pool_ref_store = pool_ref_store
unique_id = str(uuid.uuid4())
self._source_id = unique_id if name is None else f"{name}-{unique_id}"
self._priority = priority

async def propose_power(
self,
power: Power | None,
*,
bounds: timeseries.Bounds[Power | None] = timeseries.Bounds(None, None),
) -> None:
"""Send a proposal to the power manager for the pool's set of batteries.

Power values need to follow the Passive Sign Convention (PSC). That is, positive
values indicate charge power and negative values indicate discharge power.

Details on how the power manager handles proposals can be found in the
[Microgrid][frequenz.sdk.microgrid--setting-power] documentation.

Args:
power: The power to propose for the batteries in the pool. If `None`, this
proposal will not have any effect on the target power, unless bounds are
specified. When specified without bounds, bounds for lower priority
actors will be shifted by this power. If both are `None`, it is
equivalent to not having a proposal or withdrawing a previous one.
bounds: The power bounds for the proposal. When specified, this will limit
the bounds for lower priority actors.
"""
await self._pool_ref_store._power_manager_requests_sender.send(
_power_managing.Proposal(
source_id=self._source_id,
preferred_power=power,
bounds=bounds,
component_ids=self._pool_ref_store._batteries,
priority=self._priority,
creation_time=asyncio.get_running_loop().time(),
)
)

async def propose_charge(self, power: Power | None) -> None:
"""Set the given charge power for the batteries in the pool.

Expand All @@ -133,12 +73,12 @@ async def propose_charge(self, power: Power | None) -> None:
"""
if power and power < Power.zero():
raise ValueError("Charge power must be positive.")
await self._pool_ref_store._power_manager_requests_sender.send(
await self._pool_ref_store.power_manager_requests_sender.send(
_power_managing.Proposal(
source_id=self._source_id,
preferred_power=power,
bounds=timeseries.Bounds(None, None),
component_ids=self._pool_ref_store._batteries,
component_ids=self._pool_ref_store.component_ids,
priority=self._priority,
creation_time=asyncio.get_running_loop().time(),
)
Expand Down Expand Up @@ -170,27 +110,19 @@ async def propose_discharge(self, power: Power | None) -> None:
if power < Power.zero():
raise ValueError("Discharge power must be positive.")
power = -power
await self._pool_ref_store._power_manager_requests_sender.send(
await self._pool_ref_store.power_manager_requests_sender.send(
_power_managing.Proposal(
source_id=self._source_id,
preferred_power=power,
bounds=timeseries.Bounds(None, None),
component_ids=self._pool_ref_store._batteries,
component_ids=self._pool_ref_store.component_ids,
priority=self._priority,
creation_time=asyncio.get_running_loop().time(),
)
)

@property
def component_ids(self) -> abc.Set[ComponentId]:
"""Return ids of the batteries in the pool.

Returns:
Ids of the batteries in the pool
"""
return self._pool_ref_store._batteries

@property
@override
def power(self) -> Formula[Power]:
"""Fetch the total power of the batteries in the pool.

Expand All @@ -206,10 +138,10 @@ def power(self) -> Formula[Power]:
A Formula that will calculate and stream the total power of all
batteries in the pool.
"""
return self._pool_ref_store._formula_pool.from_power_formula(
return self._pool_ref_store.formula_pool.from_power_formula(
"battery_pool_power",
connection_manager.get().component_graph.battery_formula(
self._pool_ref_store._batteries
self._pool_ref_store.component_ids
),
)

Expand Down Expand Up @@ -248,10 +180,12 @@ def soc(self) -> ReceiverFetcher[Sample[Percentage]]:
batteries in the pool, considering only working batteries with
operational inverters.
"""
assert isinstance(self._pool_ref_store, BatteryPoolReferenceStore)

method_name = SendOnUpdate.name() + "_" + SoCCalculator.name()

if method_name not in self._pool_ref_store._active_methods:
calculator = SoCCalculator(self._pool_ref_store._batteries)
calculator = SoCCalculator(self._pool_ref_store.component_ids)
self._pool_ref_store._active_methods[method_name] = SendOnUpdate(
metric_calculator=calculator,
working_batteries=self._pool_ref_store._working_batteries,
Expand All @@ -268,9 +202,12 @@ def temperature(self) -> ReceiverFetcher[Sample[Temperature]]:
A MetricAggregator that will calculate and stream the average temperature
of all batteries in the pool.
"""
assert isinstance(self._pool_ref_store, BatteryPoolReferenceStore)

method_name = SendOnUpdate.name() + "_" + TemperatureCalculator.name()

if method_name not in self._pool_ref_store._active_methods:
calculator = TemperatureCalculator(self._pool_ref_store._batteries)
calculator = TemperatureCalculator(self._pool_ref_store.component_ids)
self._pool_ref_store._active_methods[method_name] = SendOnUpdate(
metric_calculator=calculator,
working_batteries=self._pool_ref_store._working_batteries,
Expand Down Expand Up @@ -305,10 +242,12 @@ def capacity(self) -> ReceiverFetcher[Sample[Energy]]:
batteries in the pool, considering only working batteries with
operational inverters.
"""
assert isinstance(self._pool_ref_store, BatteryPoolReferenceStore)

method_name = SendOnUpdate.name() + "_" + CapacityCalculator.name()

if method_name not in self._pool_ref_store._active_methods:
calculator = CapacityCalculator(self._pool_ref_store._batteries)
calculator = CapacityCalculator(self._pool_ref_store.component_ids)
self._pool_ref_store._active_methods[method_name] = SendOnUpdate(
metric_calculator=calculator,
working_batteries=self._pool_ref_store._working_batteries,
Expand All @@ -317,59 +256,16 @@ def capacity(self) -> ReceiverFetcher[Sample[Energy]]:

return self._pool_ref_store._active_methods[method_name]

@override
@property
def power_status(self) -> ReceiverFetcher[BatteryPoolReport]:
"""Get a receiver to receive new power status reports when they change.

These include
- the current inclusion/exclusion bounds available for the pool's priority,
- the current target power for the pool's set of batteries,
- the result of the last distribution request for the pool's set of batteries.

Returns:
A receiver that will stream power status reports for the pool's priority.
"""
sub = _power_managing.ReportRequest(
source_id=self._source_id,
priority=self._priority,
component_ids=self._pool_ref_store._batteries,
)
self._pool_ref_store._power_bounds_subs[sub.get_channel_name()] = (
asyncio.create_task(
self._pool_ref_store._power_manager_bounds_subscription_sender.send(sub)
)
)
channel = self._pool_ref_store._channel_registry.get_or_create(
_power_managing._Report, sub.get_channel_name()
)
channel.resend_latest = True

return channel

@property
def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]:
"""Get a receiver to receive power distribution results.

Returns:
A receiver that will stream power distribution results for the pool's set of
batteries.
"""
return MappingReceiverFetcher(
self._pool_ref_store._power_dist_results_fetcher,
lambda recv: recv.filter(
lambda x: x.request.component_ids == self._pool_ref_store._batteries
),
)

@property
def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]:
def system_power_bounds(self) -> ReceiverFetcher[SystemBounds]:
"""Get receiver to receive new power bounds when they change.

Power bounds refer to the min and max power that a battery can
discharge or charge at and is also denoted as SoP.

Power bounds formulas are described in the receiver return type.
None will be send if there is no component to calculate metrics.
None will be sent if there is no component to calculate metrics.

A receiver from the MetricAggregator can be obtained by calling the
`new_receiver` method.
Expand All @@ -378,23 +274,16 @@ def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]:
A MetricAggregator that will calculate and stream the power bounds
of all batteries in the pool.
"""
assert isinstance(self._pool_ref_store, BatteryPoolReferenceStore)

method_name = SendOnUpdate.name() + "_" + PowerBoundsCalculator.name()

if method_name not in self._pool_ref_store._active_methods:
calculator = PowerBoundsCalculator(self._pool_ref_store._batteries)
calculator = PowerBoundsCalculator(self._pool_ref_store.component_ids)
self._pool_ref_store._active_methods[method_name] = SendOnUpdate(
metric_calculator=calculator,
working_batteries=self._pool_ref_store._working_batteries,
min_update_interval=self._pool_ref_store._min_update_interval,
)

return self._pool_ref_store._active_methods[method_name]

async def stop(self) -> None:
"""Stop all tasks and channels owned by the BatteryPool."""
# This was closing the pool_ref_store, which is not correct, because those are
# shared.
#
# This method will do until we have a mechanism to track the resources created
# through it. It can also eventually cleanup the pool_ref_store, when it is
# holding the last reference to it.
Loading
Loading