Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 49 additions & 18 deletions benchmarks/timeseries/benchmark_datasourcing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError
from frequenz.client.microgrid.metrics import Metric
from frequenz.quantities import Quantity

from frequenz.sdk import microgrid
from frequenz.sdk._internal._channels import ChannelRegistry
from frequenz.sdk.microgrid._data_sourcing import (
ComponentMetricRequest,
DataSourcingActor,
)
from frequenz.sdk.timeseries import Sample

try:
from tests.timeseries.mock_microgrid import MockMicrogrid
Expand Down Expand Up @@ -70,11 +72,18 @@ async def benchmark_data_sourcing( # pylint: disable=too-many-locals
num_ev_chargers * len(COMPONENT_METRIC_IDS) * num_msgs_per_battery
)
mock_grid = MockMicrogrid(
grid_meter=False, num_values=num_msgs_per_battery, sample_rate_s=0.0
grid_meter=False,
api_client_streaming=True,
num_values=num_msgs_per_battery,
sample_rate_s=0.0,
)

mock_grid.add_ev_chargers(num_ev_chargers)
mock_grid.start_mock_client(enable_mock_client)

# Initialize the mock client without starting streaming tasks, so that
# the DataSourcingActor can subscribe to API data streams before any
# data is sent.
mock_grid.init_mock_client(enable_mock_client)

request_channel = Broadcast[ComponentMetricRequest](
name="DataSourcingActor Request Channel"
Expand All @@ -89,7 +98,6 @@ async def benchmark_data_sourcing( # pylint: disable=too-many-locals

consume_tasks = []

start_time = perf_counter()
samples_sent = 0

async def consume(channel: Receiver[Any]) -> None:
Expand All @@ -103,25 +111,48 @@ async def consume(channel: Receiver[Any]) -> None:
nonlocal samples_sent
samples_sent += 1

for evc_id in mock_grid.evc_ids:
for component_metric_id in COMPONENT_METRIC_IDS:
request = ComponentMetricRequest(
"current_phase_requests", evc_id, component_metric_id, None
)

recv_channel = channel_registry.get_or_create(
ComponentMetricRequest, request.get_channel_name()
).new_receiver()

await request_sender.send(request)
consume_tasks.append(asyncio.create_task(consume(recv_channel)))

async with DataSourcingActor(request_receiver, channel_registry):
await asyncio.gather(*consume_tasks)
# Send requests while the actor is running so it can process them
# and subscribe to API data streams before streaming begins.
for evc_id in mock_grid.evc_ids:
for component_metric_id in COMPONENT_METRIC_IDS:
request = ComponentMetricRequest(
"current_phase_requests", evc_id, component_metric_id, None
)

recv_channel = channel_registry.get_or_create(
Sample[Quantity], request.get_channel_name()
).new_receiver()

await request_sender.send(request)
consume_tasks.append(asyncio.create_task(consume(recv_channel)))

# Yield to let the actor process all requests and subscribe to
# the API data streams before we start sending mock data.
await asyncio.sleep(0.1)

# Now start the mock data streaming tasks.
start_time = perf_counter()
streaming_tasks = []
for (
comp_id,
coro,
) in mock_grid._streaming_coros: # pylint: disable=protected-access
streaming_tasks.append(asyncio.create_task(coro, name=f"stream:{comp_id}"))

# Wait for all streaming tasks to complete, then give the actor
# a moment to process any remaining in-flight messages.
await asyncio.gather(*streaming_tasks)
await asyncio.sleep(0.1)
for task in consume_tasks:
task.cancel()

time_taken = perf_counter() - start_time

await mock_grid.cleanup()
# We need to do this manually because mock_grid.cleanup() depends on using a
# mocker instance, which we don't have here.
# pylint: disable-next=protected-access
microgrid.connection_manager._CONNECTION_MANAGER = None

print(f"Samples Sent: {samples_sent}, time taken: {time_taken}")
print(f"Samples per second: {samples_sent / time_taken}")
Expand Down
Loading