Add tick_delay to mitigate boundary races in cascaded resampling#1394
Add tick_delay to mitigate boundary races in cascaded resampling#1394malteschaaf wants to merge 3 commits into
tick_delay to mitigate boundary races in cascaded resampling#1394Conversation
llucax
left a comment
There was a problem hiding this comment.
Also the commit feat(resampling): add configurable tick delay before processing body has missing identifiers:
- Add to resampler configs ...
- Restrict to be non-negative and strictly smaller than .
Should probably be:
- Add `tick_delay` to resampler configs with validation in both config variants.
- Delay processing after each timer tick while preserving the original window boundary timestamp.
- Restrict `tick_delay` to be non-negative and strictly smaller than `resampling_period`.
I would also mention here the the motivation for this new field rather than just describe the diff.
| await source_sender.send(sample_before_boundary) | ||
| await source_sender.send(sample_after_boundary) | ||
| await asyncio.sleep(0) | ||
|
|
||
| await resampler.resample(one_shot=True) |
There was a problem hiding this comment.
I think this doesn't really test the delay.
The introduced delay is placed after the timer tick has been consumed and after next_tick_time has been set, but before calling the per-source resampling helpers. This is the right placement for “delay processing, not window boundaries”, but the problem is the test sends both samples before calling resampler.resample(one_shot=True).
Then it asserts that only the sample before the boundary is included. That verifies the window boundary stays at boundary, but it does not verify that tick_delay creates time for a late-arriving sample to be received before resampling.
If the new asyncio.sleep(tick_delay) line were deleted, this test would still pass, because both samples are already buffered before the tick is processed.
To fix this you probably need to add a concurrent sender:
async def send_late_sample() -> None:
await asyncio.sleep(0.05)
await source_sender.send(sample_before_boundary)
await asyncio.sleep(0)
async with asyncio.TaskGroup() as tg:
tg.create_task(send_late_sample())
await resampler.resample(one_shot=True)I think you are also mixing up the timestamp of the sample, with the arrival time of the sample.
We should test for:
-
Sample with timestamp within the window
- Arriving at a time within the window (should be included in the resampling)
- Arriving between the end of the window and the tick_delay (should be included in the resampling)
- Arriving after the end of the window + tick_delay (should NOT be included in the resampling)
-
Sample with timestamp after the window end
- Arriving within window (should NOT be included in the resampling)
- Arriving between the end of the window and the tick_delay (should NOT be included in the resampling)
- Arriving after the end of the window + tick_delay (should NOT be included in the resampling)
After bouncing a few times with AI to come up with something that is actually readable that doesn't have a LOT of duplication, I ended up with this:
Proposed new tests
@pytest.fixture
def window_end() -> datetime:
"""The logical end of the resampling window."""
return datetime(2020, 1, 1, 0, 0, 10, tzinfo=timezone.utc)
@pytest.fixture
def resampling_period() -> timedelta:
"""The resampling period used by the test resampler."""
return timedelta(seconds=10)
@pytest.fixture
def tick_delay() -> timedelta:
"""The delay between timer tick and resampling processing."""
return timedelta(milliseconds=200)
@pytest.fixture
def after_tick_delay(tick_delay: timedelta) -> timedelta:
"""A deterministic delay that happens after tick_delay has elapsed."""
return tick_delay + timedelta(milliseconds=10)
@pytest.fixture
def source_receiver(
source_chan: Broadcast[Sample[Quantity]],
) -> Receiver[Sample[Quantity]]:
"""A receiver for samples sent to the test source channel."""
return source_chan.new_receiver()
@pytest.fixture
def source_sender(
source_chan: Broadcast[Sample[Quantity]],
) -> Sender[Sample[Quantity]]:
"""A sender for the test source channel."""
return source_chan.new_sender()
@pytest.fixture
def sink_mock() -> AsyncMock:
"""A sink mock used to collect resampled output samples."""
return AsyncMock(spec=Sink, return_value=True)
@pytest.fixture
def resampling_fun_mock() -> MagicMock:
"""A resampling function mock returning a fixed value."""
return MagicMock(spec=ResamplingFunction, return_value=42.0)
@pytest.fixture
async def tick_delay_resampler(
window_end: datetime,
source_receiver: Receiver[Sample[Quantity]],
sink_mock: AsyncMock,
resampling_fun_mock: MagicMock,
resampling_period: timedelta,
tick_delay: timedelta,
) -> AsyncIterator[Resampler]:
"""Create a resampler configured with tick_delay and one deterministic tick."""
async def timer() -> AsyncIterator[TickInfo]:
yield TickInfo(expected_tick_time=window_end, sleep_infos=[])
config = ResamplerConfig2(
resampling_period=resampling_period,
max_data_age_in_periods=1.0,
resampling_function=resampling_fun_mock,
closed=WindowSide.LEFT,
tick_delay=tick_delay,
)
resampler = Resampler(config)
# Use a deterministic timer tick so tests can control sample arrival time
# independently from the logical window boundary.
# pylint: disable=protected-access
resampler._timer = cast(Any, timer())
resampler.add_timeseries("test", source_receiver, sink_mock)
try:
yield resampler
finally:
await resampler.stop()
@pytest.fixture
def sample_at_window_start(
window_end: datetime,
resampling_period: timedelta,
) -> Sample[Quantity]:
"""A sample exactly at the included left boundary of the window."""
return Sample(window_end - resampling_period, value=Quantity(1.0))
@pytest.fixture
def sample_inside_window(window_end: datetime) -> Sample[Quantity]:
"""A sample inside the selected resampling window."""
return Sample(window_end - timedelta(milliseconds=100), value=Quantity(2.0))
@pytest.fixture
def sample_at_window_end(window_end: datetime) -> Sample[Quantity]:
"""A sample exactly at the excluded right boundary of the window."""
return Sample(window_end, value=Quantity(3.0))
@pytest.fixture
def sample_inside_tick_delay(
window_end: datetime,
tick_delay: timedelta,
) -> Sample[Quantity]:
"""A sample timestamped after the window end but before tick_delay ends."""
return Sample(window_end + (tick_delay / 2), value=Quantity(4.0))
@pytest.fixture
def sample_at_tick_delay_end(
window_end: datetime,
tick_delay: timedelta,
) -> Sample[Quantity]:
"""A sample timestamped exactly at window_end + tick_delay."""
return Sample(window_end + tick_delay, value=Quantity(5.0))
@pytest.fixture
def sample_after_tick_delay_end(
window_end: datetime,
after_tick_delay: timedelta,
) -> Sample[Quantity]:
"""A sample timestamped after window_end + tick_delay."""
return Sample(window_end + after_tick_delay, value=Quantity(6.0))
async def test_tick_delay_keeps_window_boundaries_for_samples_arriving_before_tick(
tick_delay_resampler: Resampler,
source_receiver: Receiver[Sample[Quantity]],
source_sender: Sender[Sample[Quantity]],
sink_mock: AsyncMock,
resampling_fun_mock: MagicMock,
window_end: datetime,
sample_at_window_start: Sample[Quantity],
sample_inside_window: Sample[Quantity],
sample_at_window_end: Sample[Quantity],
sample_inside_tick_delay: Sample[Quantity],
sample_at_tick_delay_end: Sample[Quantity],
) -> None:
"""Samples already buffered before the tick are filtered by timestamp only."""
await source_sender.send(sample_at_window_start)
await source_sender.send(sample_inside_window)
await source_sender.send(sample_at_window_end)
await source_sender.send(sample_inside_tick_delay)
await source_sender.send(sample_at_tick_delay_end)
# Let the resampler's background receiving task buffer the samples before
# the timer tick is processed.
await asyncio.sleep(0)
await tick_delay_resampler.resample(one_shot=True)
resampling_fun_mock.assert_called_once_with(
a_sequence(
as_float_tuple(sample_at_window_start),
as_float_tuple(sample_inside_window),
),
tick_delay_resampler.config,
tick_delay_resampler.get_source_properties(source_receiver),
)
sink_mock.assert_called_once_with(Sample(window_end, Quantity(42.0)))
async def test_tick_delay_includes_window_samples_arriving_during_delay(
tick_delay_resampler: Resampler,
source_receiver: Receiver[Sample[Quantity]],
source_sender: Sender[Sample[Quantity]],
sink_mock: AsyncMock,
resampling_fun_mock: MagicMock,
window_end: datetime,
tick_delay: timedelta,
sample_at_window_start: Sample[Quantity],
sample_inside_window: Sample[Quantity],
sample_at_window_end: Sample[Quantity],
sample_inside_tick_delay: Sample[Quantity],
sample_at_tick_delay_end: Sample[Quantity],
) -> None:
"""Samples arriving after the tick but before processing are considered."""
async def send_samples_during_tick_delay() -> None:
# The timer tick has happened, but resampling is still delayed.
await asyncio.sleep((tick_delay / 2).total_seconds())
await source_sender.send(sample_at_window_start)
await source_sender.send(sample_inside_window)
await source_sender.send(sample_at_window_end)
await source_sender.send(sample_inside_tick_delay)
await source_sender.send(sample_at_tick_delay_end)
# Let the resampler's background receiving task buffer the samples
# before tick_delay expires.
await asyncio.sleep(0)
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send_samples_during_tick_delay())
await tick_delay_resampler.resample(one_shot=True)
resampling_fun_mock.assert_called_once_with(
a_sequence(
as_float_tuple(sample_at_window_start),
as_float_tuple(sample_inside_window),
),
tick_delay_resampler.config,
tick_delay_resampler.get_source_properties(source_receiver),
)
sink_mock.assert_called_once_with(Sample(window_end, Quantity(42.0)))
async def test_tick_delay_excludes_future_timestamps_arriving_during_delay(
tick_delay_resampler: Resampler,
source_sender: Sender[Sample[Quantity]],
sink_mock: AsyncMock,
resampling_fun_mock: MagicMock,
window_end: datetime,
tick_delay: timedelta,
sample_inside_tick_delay: Sample[Quantity],
sample_at_tick_delay_end: Sample[Quantity],
sample_after_tick_delay_end: Sample[Quantity],
) -> None:
"""Future-window timestamps are excluded even if they arrive before processing."""
async def send_future_samples_during_tick_delay() -> None:
await asyncio.sleep((tick_delay / 2).total_seconds())
await source_sender.send(sample_inside_tick_delay)
await source_sender.send(sample_at_tick_delay_end)
await source_sender.send(sample_after_tick_delay_end)
await asyncio.sleep(0)
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send_future_samples_during_tick_delay())
await tick_delay_resampler.resample(one_shot=True)
resampling_fun_mock.assert_not_called()
sink_mock.assert_called_once_with(Sample(window_end, None))9a252d2 to
fac039a
Compare
- Add `tick_delay` to resampler configs with validation in both config variants. - Delay processing after each timer tick while preserving the original window boundary timestamp. - Restrict tick_delay to be non-negative and strictly smaller than resampling_period. This mitigates boundary races in cascaded resampling by delaying downstream processing. Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
- Add validation tests for negative and oversized `tick_delay` values in both config variants. - Add fixture-based boundary tests that separate sample arrival time from sample timestamp filtering. - Add a with/without-delay late-arrival comparison to show that delayed processing can include in-window samples without shifting window boundaries. Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
fac039a to
9e10893
Compare
|
Ready for review again. Changes:
|
llucax
left a comment
There was a problem hiding this comment.
I'm still not convinced about the test coverage (and structure of the parametrized test) 😬
| Typical use case: | ||
| This can be used in cascaded resampling setups to reduce timing races | ||
| where downstream windows are processed before upstream resampled values | ||
| are emitted. |
There was a problem hiding this comment.
Nitpick: I would use a standard admonition, in this case probably Example:
|
|
||
| ## New Features | ||
|
|
||
| <!-- Here goes the main new features and examples or instructions on how to use them --> |
There was a problem hiding this comment.
New features should be put here.
|
|
||
| ## New Features | ||
|
|
||
| * A new `tick_delay` option was added to `ResamplerConfig` and `ResamplerConfig2` to delay resampling execution after each timer tick. The delay was designed to postpone processing while keeping window boundaries aligned to the original tick times, which can be used for cascaded resampling pipelines. This option is experimental and may be changed or deprecated in a future release. | ||
|
|
||
| ## Bug Fixes | ||
|
|
||
| <!-- Here goes notable bug fixes that are worth a special mention or explanation --> |
There was a problem hiding this comment.
And these should be removed, as they are duplicated.
|
|
||
|
|
||
| @pytest.fixture | ||
| async def tick_delay_resampler_factory( |
There was a problem hiding this comment.
Note
I kept all these comments about ways to override fixtures and parametrization for educational purpuse, but I think we should actually remove this parametrized test as is. Please look at the last comment in this file.
If you need to customize how a fixture is created, you can just define a function, a fixture that returns a method to create an object kind of defeats the purpose to create a fixture.
But you can achieve the same in a much more idiomatic and simple way in pytest. You can just define the parameter with name tick_delay to override the fixture:
@pytest.mark.parametrize(
("tick_delay", "expect_included"), # <-- Note this is now just tick_delay, not tick_delay_override
[
pytest.param(timedelta(0), False, id="without-tick-delay"),
pytest.param(timedelta(milliseconds=200), True, id="with-tick-delay"),
],
)
async def test_tick_delay_late_arrival_with_and_without_delay(
tick_delay_resampler: Resampler, # <--- Normal resampler fixture here
...,
) -> None:
"""Compare late-arrival behavior with and without delayed processing."""
# No need to call any factory here
...And you can remove this factory.
| return MagicMock(spec=ResamplingFunction, return_value=42.0) | ||
|
|
||
|
|
||
| # Tick-delay test fixtures: resampler setup |
There was a problem hiding this comment.
If you remove the factory I would remove this comment, as there is only one fixture for this.
| # Tick-delay test fixtures: resampler setup |
| tick_delay_resampler = await tick_delay_resampler_factory(tick_delay_override) | ||
|
|
There was a problem hiding this comment.
| tick_delay_resampler = await tick_delay_resampler_factory(tick_delay_override) |
| (timedelta(0), False), | ||
| (timedelta(milliseconds=200), True), |
There was a problem hiding this comment.
This will give test a more meaningful name.
| (timedelta(0), False), | |
| (timedelta(milliseconds=200), True), | |
| pytest.param(timedelta(0), False, id="without-delay"), | |
| pytest.param(timedelta(milliseconds=200), True, id="with-delay"), |
| (timedelta(milliseconds=200), True), | ||
| ], | ||
| ) | ||
| async def test_tick_delay_late_arrival_with_and_without_delay( |
There was a problem hiding this comment.
With good id you can shorten the test name:
| async def test_tick_delay_late_arrival_with_and_without_delay( | |
| async def test_tick_delay_late_arrival( |
The result is:
tests/timeseries/test_resampling.py::test_tick_delay_late_arrival_with_and_without_delay[tick_delay0-False] PASSED [ 50%]
tests/timeseries/test_resampling.py::test_tick_delay_late_arrival_with_and_without_delay[tick_delay1-True] PASSED [100%]
vs
tests/timeseries/test_resampling.py::test_tick_delay_late_arrival[without-delay] PASSED [ 68%]
tests/timeseries/test_resampling.py::test_tick_delay_late_arrival[with-delay] PASSED [ 69%]
| if expect_included: | ||
| resampling_fun_mock.assert_called_once_with( | ||
| a_sequence( | ||
| as_float_tuple(sample_at_window_start), | ||
| as_float_tuple(sample_inside_window), | ||
| ), | ||
| tick_delay_resampler.config, | ||
| tick_delay_resampler.get_source_properties(source_receiver), | ||
| ) | ||
| sink_mock.assert_called_once_with(Sample(window_end, Quantity(42.0))) | ||
| else: | ||
| resampling_fun_mock.assert_not_called() | ||
| sink_mock.assert_called_once_with(Sample(window_end, None)) |
There was a problem hiding this comment.
I don't think this is actually good parametrization. If you need to change the test logic depending on the parameters, it should be a separate test, not a parameter. Ideally the test logic should be very simple, ideally sequential, no branching at all.
| resampling_fun_mock.assert_not_called() | ||
| sink_mock.assert_called_once_with(Sample(window_end, None)) | ||
|
|
||
|
|
There was a problem hiding this comment.
The new tests are much better than the original test, but not all the suggested tests were added here. It is still missing the case where a sample timestamp is inside the selected window, but the sample arrives after a non-zero tick_delay has elapsed (it should not be included).
The current zero-delay half of test_tick_delay_late_arrival_with_and_without_delay() tests “without the implementation delay, the late sample is missed”, but it does not test the cutoff behavior for a configured tick_delay=200ms.
I'd suggest adding a new test for that.
But if you split test_tick_delay_late_arrival_with_and_without_delay() ( and I think you should), then it probably will make sense to extend the half using a 200ms delay and the test proposed here.
👇
I suggest the following test structure (click to expand)
I’d use four tests, that avoids parametrization (for different scenarios) and keeps each test single-purpose.
Keeping test_tick_delay_prebuffered_samples_follow_timestamp_boundaries():
Replace the parametrized test with this regression test
async def test_without_tick_delay_late_window_samples_are_missed(
tick_delay_resampler_factory: Callable[[timedelta], Awaitable[Resampler]],
source_sender: Sender[Sample[Quantity]],
sink_mock: AsyncMock,
resampling_fun_mock: MagicMock,
window_end: datetime,
sample_at_window_start: Sample[Quantity],
sample_inside_window: Sample[Quantity],
) -> None:
"""Regression test: without `tick_delay`, late in-window samples are missed.
The samples have timestamps inside the selected window, but they arrive
after the timer tick. With zero delay, processing happens immediately, so
they are not buffered in time for this resampling run.
"""
resampler = await tick_delay_resampler_factory(timedelta(0))
async def send_samples_after_tick() -> None:
await asyncio.sleep(0.05)
await source_sender.send(sample_at_window_start)
await source_sender.send(sample_inside_window)
await asyncio.sleep(0)
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send_samples_after_tick())
await resampler.resample(one_shot=True)
resampling_fun_mock.assert_not_called()
sink_mock.assert_called_once_with(Sample(window_end, None))Keep the positive delay case separate
async def test_tick_delay_includes_only_window_samples_arriving_during_delay(
tick_delay_resampler: Resampler,
source_receiver: Receiver[Sample[Quantity]],
source_sender: Sender[Sample[Quantity]],
sink_mock: AsyncMock,
resampling_fun_mock: MagicMock,
window_end: datetime,
tick_delay: timedelta,
sample_at_window_start: Sample[Quantity],
sample_inside_window: Sample[Quantity],
sample_at_window_end: Sample[Quantity],
sample_inside_tick_delay: Sample[Quantity],
sample_at_tick_delay_end: Sample[Quantity],
sample_after_tick_delay_end: Sample[Quantity],
) -> None:
"""Late arrivals are included only when their timestamps are in-window.
Samples arrive after the timer tick but before delayed processing happens.
The in-window samples should be included; timestamps at or after the window
end should still be excluded.
"""
async def send_samples_during_tick_delay() -> None:
await asyncio.sleep((tick_delay / 2).total_seconds())
await source_sender.send(sample_at_window_start)
await source_sender.send(sample_inside_window)
await source_sender.send(sample_at_window_end)
await source_sender.send(sample_inside_tick_delay)
await source_sender.send(sample_at_tick_delay_end)
await source_sender.send(sample_after_tick_delay_end)
await asyncio.sleep(0)
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send_samples_during_tick_delay())
await tick_delay_resampler.resample(one_shot=True)
resampling_fun_mock.assert_called_once_with(
a_sequence(
as_float_tuple(sample_at_window_start),
as_float_tuple(sample_inside_window),
),
tick_delay_resampler.config,
tick_delay_resampler.get_source_properties(source_receiver),
)
sink_mock.assert_called_once_with(Sample(window_end, Quantity(42.0)))Add the after-delay cutoff test
async def test_tick_delay_excludes_window_samples_arriving_after_delay(
tick_delay_resampler: Resampler,
source_sender: Sender[Sample[Quantity]],
sink_mock: AsyncMock,
resampling_fun_mock: MagicMock,
window_end: datetime,
after_tick_delay: timedelta,
sample_at_window_start: Sample[Quantity],
sample_inside_window: Sample[Quantity],
) -> None:
"""In-window samples arriving after tick_delay are not considered.
The sample timestamps belong to the selected window, but their arrival time
is after delayed processing has already started.
"""
async def send_samples_after_tick_delay() -> None:
await asyncio.sleep(after_tick_delay.total_seconds())
await source_sender.send(sample_at_window_start)
await source_sender.send(sample_inside_window)
await asyncio.sleep(0)
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send_samples_after_tick_delay())
await tick_delay_resampler.resample(one_shot=True)
resampling_fun_mock.assert_not_called()
sink_mock.assert_called_once_with(Sample(window_end, None))☝️
Problem
In cascaded timer-based resamplers (for example
1s -> 10s), a race can happen at window boundaries:the downstream resampler can tick and compute its window before the upstream resampler has emitted the last sample for the previous window.
Example with
max_data_age_in_periods=1:[9,10)and emits its result (label9) slightly aftert=10.[0,10)att=10and does not see that label-9sample yet.t=20), that delayed sample is too old and is no longer considered.Result: the affected downstream window stays permanently incomplete.
Change
tick_delaytoResamplerConfigandResamplerConfig2.tick_delayis keyword-only to avoid positional-argument API breakage.tick_delayafter a tick, while still computing with the original window boundary.tick_delaymust be>= 0and< resampling_period.tick_delayas experimental (it may be changed or deprecated in the future).Tests
tick_delayvalues (< 0,>= resampling_period).tick_delaydiffers) to make the delay effect explicit.