Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,11 @@
## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->

## New Features

* A new `tick_delay` option was added to `ResamplerConfig` and `ResamplerConfig2` to delay resampling execution after each timer tick. The delay was designed to postpone processing while keeping window boundaries aligned to the original tick times, which can be used for cascaded resampling pipelines. This option is experimental and may be changed or deprecated in a future release.

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
Comment on lines +18 to +25
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And these should be removed, as they are duplicated.

33 changes: 33 additions & 0 deletions src/frequenz/sdk/timeseries/_resampling/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,25 @@ class ResamplerConfig:
time the resampler is created.
"""

tick_delay: timedelta = field(default=timedelta(0), kw_only=True)
"""Delay before processing each resampling tick.

This delays when resampling computation happens, while keeping the
resampling windows aligned to the original timer tick boundaries. This
delay is only a time-based buffer, not a strict synchronization mechanism.

Warning:
This is an experimental option and may be changed or deprecated in
the future.

Typical use case:
This can be used in cascaded resampling setups to reduce timing races
where downstream windows are processed before upstream resampled values
are emitted.
Comment on lines +215 to +218
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: I would use a standard admonition, in this case probably Example:


It must be non-negative and smaller than `resampling_period`.
"""

def __post_init__(self) -> None:
"""Check that config values are valid.

Expand Down Expand Up @@ -245,6 +264,13 @@ def __post_init__(self) -> None:
raise ValueError(
f"align_to ({self.align_to}) should be a timezone aware datetime"
)
if self.tick_delay < timedelta(0):
raise ValueError(f"tick_delay ({self.tick_delay}) should be non-negative")
if self.tick_delay >= self.resampling_period:
raise ValueError(
f"tick_delay ({self.tick_delay}) should be smaller than "
f"resampling_period ({self.resampling_period})"
)


class ResamplingFunction2(Protocol):
Expand Down Expand Up @@ -415,3 +441,10 @@ def __post_init__(self) -> None:
raise ValueError(
f"align_to ({self.align_to}) must be specified via timer_config"
)
if self.tick_delay < timedelta(0):
raise ValueError(f"tick_delay ({self.tick_delay}) should be non-negative")
if self.tick_delay >= self.resampling_period:
raise ValueError(
f"tick_delay ({self.tick_delay}) should be smaller than "
f"resampling_period ({self.resampling_period})"
)
5 changes: 5 additions & 0 deletions src/frequenz/sdk/timeseries/_resampling/_resampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ async def resample(self, *, one_shot: bool = False) -> None:
case unexpected:
assert_never(unexpected)

# Delay processing to let upstream cascaded resamplers emit their
# boundary samples first; window boundaries still use next_tick_time.
if self._config.tick_delay:
Comment thread
llucax marked this conversation as resolved.
await asyncio.sleep(self._config.tick_delay.total_seconds())

# We need to make a copy here because we need to match the results to the
# current resamplers, and since we await here, new resamplers could be added
# or removed from the dict while we awaiting the resampling, which would
Expand Down
Loading
Loading