Skip to content

Add tick_delay to mitigate boundary races in cascaded resampling#1394

Open
malteschaaf wants to merge 3 commits into
frequenz-floss:v1.x.xfrom
malteschaaf:cascaded_resampler
Open

Add tick_delay to mitigate boundary races in cascaded resampling#1394
malteschaaf wants to merge 3 commits into
frequenz-floss:v1.x.xfrom
malteschaaf:cascaded_resampler

Conversation

@malteschaaf
Copy link
Copy Markdown
Contributor

@malteschaaf malteschaaf commented Apr 23, 2026

Problem

In cascaded timer-based resamplers (for example 1s -> 10s), a race can happen at window boundaries:
the downstream resampler can tick and compute its window before the upstream resampler has emitted the last sample for the previous window.

Example with max_data_age_in_periods=1:

  • The upstream resampler computes [9,10) and emits its result (label 9) slightly after t=10.
  • The downstream resampler computes [0,10) at t=10 and does not see that label-9 sample yet.
  • At the next tick (t=20), that delayed sample is too old and is no longer considered.

Result: the affected downstream window stays permanently incomplete.

Change

  • Added tick_delay to ResamplerConfig and ResamplerConfig2.
  • tick_delay is keyword-only to avoid positional-argument API breakage.
  • The resampler can now optionally wait tick_delay after a tick, while still computing with the original window boundary.
  • This delays processing without shifting the actual window definitions.
  • Added validation: tick_delay must be >= 0 and < resampling_period.
  • Added documentation for the cascaded-resampling use case.
  • Marked tick_delay as experimental (it may be changed or deprecated in the future).

Tests

  • Added validation tests for invalid tick_delay values (< 0, >= resampling_period).
  • Added fixture-based timing tests that separate sample arrival time from timestamp-based window filtering.
  • Added a with/without-delay late-arrival comparison (same timestamps and arrival timing, only tick_delay differs) to make the delay effect explicit.

@malteschaaf malteschaaf requested a review from a team as a code owner April 23, 2026 11:02
@malteschaaf malteschaaf requested review from Marenz and removed request for a team April 23, 2026 11:02
@github-actions github-actions Bot added part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests part:data-pipeline Affects the data pipeline labels Apr 23, 2026
@malteschaaf malteschaaf requested a review from llucax April 23, 2026 11:02
Copy link
Copy Markdown
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the commit feat(resampling): add configurable tick delay before processing body has missing identifiers:

- Add  to resampler configs ...
- Restrict  to be non-negative and strictly smaller than .

Should probably be:

- Add `tick_delay` to resampler configs with validation in both config variants.
- Delay processing after each timer tick while preserving the original window boundary timestamp.
- Restrict `tick_delay` to be non-negative and strictly smaller than `resampling_period`.

I would also mention here the the motivation for this new field rather than just describe the diff.

Comment thread src/frequenz/sdk/timeseries/_resampling/_config.py Outdated
Comment thread src/frequenz/sdk/timeseries/_resampling/_resampler.py
Comment thread tests/timeseries/test_resampling.py Outdated
Comment on lines +256 to +260
await source_sender.send(sample_before_boundary)
await source_sender.send(sample_after_boundary)
await asyncio.sleep(0)

await resampler.resample(one_shot=True)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this doesn't really test the delay.

The introduced delay is placed after the timer tick has been consumed and after next_tick_time has been set, but before calling the per-source resampling helpers. This is the right placement for “delay processing, not window boundaries”, but the problem is the test sends both samples before calling resampler.resample(one_shot=True).

Then it asserts that only the sample before the boundary is included. That verifies the window boundary stays at boundary, but it does not verify that tick_delay creates time for a late-arriving sample to be received before resampling.

If the new asyncio.sleep(tick_delay) line were deleted, this test would still pass, because both samples are already buffered before the tick is processed.

To fix this you probably need to add a concurrent sender:

async def send_late_sample() -> None:
    await asyncio.sleep(0.05)
    await source_sender.send(sample_before_boundary)
    await asyncio.sleep(0)

async with asyncio.TaskGroup() as tg:
    tg.create_task(send_late_sample())
    await resampler.resample(one_shot=True)

I think you are also mixing up the timestamp of the sample, with the arrival time of the sample.

We should test for:

  • Sample with timestamp within the window

    • Arriving at a time within the window (should be included in the resampling)
    • Arriving between the end of the window and the tick_delay (should be included in the resampling)
    • Arriving after the end of the window + tick_delay (should NOT be included in the resampling)
  • Sample with timestamp after the window end

    • Arriving within window (should NOT be included in the resampling)
    • Arriving between the end of the window and the tick_delay (should NOT be included in the resampling)
    • Arriving after the end of the window + tick_delay (should NOT be included in the resampling)

After bouncing a few times with AI to come up with something that is actually readable that doesn't have a LOT of duplication, I ended up with this:

Proposed new tests
@pytest.fixture
def window_end() -> datetime:
    """The logical end of the resampling window."""
    return datetime(2020, 1, 1, 0, 0, 10, tzinfo=timezone.utc)


@pytest.fixture
def resampling_period() -> timedelta:
    """The resampling period used by the test resampler."""
    return timedelta(seconds=10)


@pytest.fixture
def tick_delay() -> timedelta:
    """The delay between timer tick and resampling processing."""
    return timedelta(milliseconds=200)


@pytest.fixture
def after_tick_delay(tick_delay: timedelta) -> timedelta:
    """A deterministic delay that happens after tick_delay has elapsed."""
    return tick_delay + timedelta(milliseconds=10)


@pytest.fixture
def source_receiver(
    source_chan: Broadcast[Sample[Quantity]],
) -> Receiver[Sample[Quantity]]:
    """A receiver for samples sent to the test source channel."""
    return source_chan.new_receiver()


@pytest.fixture
def source_sender(
    source_chan: Broadcast[Sample[Quantity]],
) -> Sender[Sample[Quantity]]:
    """A sender for the test source channel."""
    return source_chan.new_sender()


@pytest.fixture
def sink_mock() -> AsyncMock:
    """A sink mock used to collect resampled output samples."""
    return AsyncMock(spec=Sink, return_value=True)


@pytest.fixture
def resampling_fun_mock() -> MagicMock:
    """A resampling function mock returning a fixed value."""
    return MagicMock(spec=ResamplingFunction, return_value=42.0)


@pytest.fixture
async def tick_delay_resampler(
    window_end: datetime,
    source_receiver: Receiver[Sample[Quantity]],
    sink_mock: AsyncMock,
    resampling_fun_mock: MagicMock,
    resampling_period: timedelta,
    tick_delay: timedelta,
) -> AsyncIterator[Resampler]:
    """Create a resampler configured with tick_delay and one deterministic tick."""

    async def timer() -> AsyncIterator[TickInfo]:
        yield TickInfo(expected_tick_time=window_end, sleep_infos=[])

    config = ResamplerConfig2(
        resampling_period=resampling_period,
        max_data_age_in_periods=1.0,
        resampling_function=resampling_fun_mock,
        closed=WindowSide.LEFT,
        tick_delay=tick_delay,
    )
    resampler = Resampler(config)

    # Use a deterministic timer tick so tests can control sample arrival time
    # independently from the logical window boundary.
    # pylint: disable=protected-access
    resampler._timer = cast(Any, timer())

    resampler.add_timeseries("test", source_receiver, sink_mock)

    try:
        yield resampler
    finally:
        await resampler.stop()

@pytest.fixture
def sample_at_window_start(
    window_end: datetime,
    resampling_period: timedelta,
) -> Sample[Quantity]:
    """A sample exactly at the included left boundary of the window."""
    return Sample(window_end - resampling_period, value=Quantity(1.0))


@pytest.fixture
def sample_inside_window(window_end: datetime) -> Sample[Quantity]:
    """A sample inside the selected resampling window."""
    return Sample(window_end - timedelta(milliseconds=100), value=Quantity(2.0))


@pytest.fixture
def sample_at_window_end(window_end: datetime) -> Sample[Quantity]:
    """A sample exactly at the excluded right boundary of the window."""
    return Sample(window_end, value=Quantity(3.0))


@pytest.fixture
def sample_inside_tick_delay(
    window_end: datetime,
    tick_delay: timedelta,
) -> Sample[Quantity]:
    """A sample timestamped after the window end but before tick_delay ends."""
    return Sample(window_end + (tick_delay / 2), value=Quantity(4.0))


@pytest.fixture
def sample_at_tick_delay_end(
    window_end: datetime,
    tick_delay: timedelta,
) -> Sample[Quantity]:
    """A sample timestamped exactly at window_end + tick_delay."""
    return Sample(window_end + tick_delay, value=Quantity(5.0))


@pytest.fixture
def sample_after_tick_delay_end(
    window_end: datetime,
    after_tick_delay: timedelta,
) -> Sample[Quantity]:
    """A sample timestamped after window_end + tick_delay."""
    return Sample(window_end + after_tick_delay, value=Quantity(6.0))

async def test_tick_delay_keeps_window_boundaries_for_samples_arriving_before_tick(
    tick_delay_resampler: Resampler,
    source_receiver: Receiver[Sample[Quantity]],
    source_sender: Sender[Sample[Quantity]],
    sink_mock: AsyncMock,
    resampling_fun_mock: MagicMock,
    window_end: datetime,
    sample_at_window_start: Sample[Quantity],
    sample_inside_window: Sample[Quantity],
    sample_at_window_end: Sample[Quantity],
    sample_inside_tick_delay: Sample[Quantity],
    sample_at_tick_delay_end: Sample[Quantity],
) -> None:
    """Samples already buffered before the tick are filtered by timestamp only."""
    await source_sender.send(sample_at_window_start)
    await source_sender.send(sample_inside_window)
    await source_sender.send(sample_at_window_end)
    await source_sender.send(sample_inside_tick_delay)
    await source_sender.send(sample_at_tick_delay_end)

    # Let the resampler's background receiving task buffer the samples before
    # the timer tick is processed.
    await asyncio.sleep(0)

    await tick_delay_resampler.resample(one_shot=True)

    resampling_fun_mock.assert_called_once_with(
        a_sequence(
            as_float_tuple(sample_at_window_start),
            as_float_tuple(sample_inside_window),
        ),
        tick_delay_resampler.config,
        tick_delay_resampler.get_source_properties(source_receiver),
    )
    sink_mock.assert_called_once_with(Sample(window_end, Quantity(42.0)))

async def test_tick_delay_includes_window_samples_arriving_during_delay(
    tick_delay_resampler: Resampler,
    source_receiver: Receiver[Sample[Quantity]],
    source_sender: Sender[Sample[Quantity]],
    sink_mock: AsyncMock,
    resampling_fun_mock: MagicMock,
    window_end: datetime,
    tick_delay: timedelta,
    sample_at_window_start: Sample[Quantity],
    sample_inside_window: Sample[Quantity],
    sample_at_window_end: Sample[Quantity],
    sample_inside_tick_delay: Sample[Quantity],
    sample_at_tick_delay_end: Sample[Quantity],
) -> None:
    """Samples arriving after the tick but before processing are considered."""

    async def send_samples_during_tick_delay() -> None:
        # The timer tick has happened, but resampling is still delayed.
        await asyncio.sleep((tick_delay / 2).total_seconds())

        await source_sender.send(sample_at_window_start)
        await source_sender.send(sample_inside_window)
        await source_sender.send(sample_at_window_end)
        await source_sender.send(sample_inside_tick_delay)
        await source_sender.send(sample_at_tick_delay_end)

        # Let the resampler's background receiving task buffer the samples
        # before tick_delay expires.
        await asyncio.sleep(0)

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send_samples_during_tick_delay())
        await tick_delay_resampler.resample(one_shot=True)

    resampling_fun_mock.assert_called_once_with(
        a_sequence(
            as_float_tuple(sample_at_window_start),
            as_float_tuple(sample_inside_window),
        ),
        tick_delay_resampler.config,
        tick_delay_resampler.get_source_properties(source_receiver),
    )
    sink_mock.assert_called_once_with(Sample(window_end, Quantity(42.0)))

async def test_tick_delay_excludes_future_timestamps_arriving_during_delay(
    tick_delay_resampler: Resampler,
    source_sender: Sender[Sample[Quantity]],
    sink_mock: AsyncMock,
    resampling_fun_mock: MagicMock,
    window_end: datetime,
    tick_delay: timedelta,
    sample_inside_tick_delay: Sample[Quantity],
    sample_at_tick_delay_end: Sample[Quantity],
    sample_after_tick_delay_end: Sample[Quantity],
) -> None:
    """Future-window timestamps are excluded even if they arrive before processing."""

    async def send_future_samples_during_tick_delay() -> None:
        await asyncio.sleep((tick_delay / 2).total_seconds())

        await source_sender.send(sample_inside_tick_delay)
        await source_sender.send(sample_at_tick_delay_end)
        await source_sender.send(sample_after_tick_delay_end)

        await asyncio.sleep(0)

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send_future_samples_during_tick_delay())
        await tick_delay_resampler.resample(one_shot=True)

    resampling_fun_mock.assert_not_called()
    sink_mock.assert_called_once_with(Sample(window_end, None))

@malteschaaf malteschaaf force-pushed the cascaded_resampler branch from 9a252d2 to fac039a Compare May 7, 2026 12:24
- Add `tick_delay` to resampler configs with validation in both config variants.
- Delay processing after each timer tick while preserving the original window boundary timestamp.
- Restrict tick_delay to be non-negative and strictly smaller than resampling_period.

This mitigates boundary races in cascaded resampling by delaying downstream processing.

Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
- Add validation tests for negative and oversized `tick_delay` values in both config variants.
- Add fixture-based boundary tests that separate sample arrival time from sample timestamp filtering.
- Add a with/without-delay late-arrival comparison to show that delayed processing can include in-window samples without shifting window boundaries.

Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
@malteschaaf malteschaaf force-pushed the cascaded_resampler branch from fac039a to 9e10893 Compare May 7, 2026 12:25
@malteschaaf
Copy link
Copy Markdown
Contributor Author

Ready for review again.

Changes:

  • Reworded the feat(resampling): add configurable tick delay before processing commit message to restore the missing identifiers and include the motivation for the change.
  • Updated tick_delay API/docs:
    • Marked tick_delay as keyword-only (kw_only=True) to avoid positional-argument breakage.
    • Expanded docs to clarify the typical cascaded-resampling use case.
    • Added an explicit note that tick_delay is experimental and may change/deprecate in the future.
  • Added inline comment in the resampler loop (if self._config.tick_delay:) to explain why the delay is applied and to clarify that window boundaries are still based on the original tick time.
  • Reworked tick-delay tests based on feedback:
    • Replaced the previous compact scenario test with fixture-based boundary tests that clearly separate arrival time vs timestamp filtering.
    • Added explicit late-arrival behavior checks.
    • Added a with/without-delay A/B parameterization in the existing late-arrival test (same timestamps + same arrival timing, only tick_delay differs) to make the delay effect directly visible.
    • Improved test names and docstrings for clarity.
  • Updated release-note wording to mention that tick_delay is experimental.

@malteschaaf malteschaaf requested a review from llucax May 7, 2026 12:31
Copy link
Copy Markdown
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not convinced about the test coverage (and structure of the parametrized test) 😬

Comment on lines +215 to +218
Typical use case:
This can be used in cascaded resampling setups to reduce timing races
where downstream windows are processed before upstream resampled values
are emitted.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: I would use a standard admonition, in this case probably Example:

Comment thread RELEASE_NOTES.md

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New features should be put here.

Comment thread RELEASE_NOTES.md
Comment on lines +18 to +25

## New Features

* A new `tick_delay` option was added to `ResamplerConfig` and `ResamplerConfig2` to delay resampling execution after each timer tick. The delay was designed to postpone processing while keeping window boundaries aligned to the original tick times, which can be used for cascaded resampling pipelines. This option is experimental and may be changed or deprecated in a future release.

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And these should be removed, as they are duplicated.



@pytest.fixture
async def tick_delay_resampler_factory(
Copy link
Copy Markdown
Contributor

@llucax llucax May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note

I kept all these comments about ways to override fixtures and parametrization for educational purpuse, but I think we should actually remove this parametrized test as is. Please look at the last comment in this file.

If you need to customize how a fixture is created, you can just define a function, a fixture that returns a method to create an object kind of defeats the purpose to create a fixture.
But you can achieve the same in a much more idiomatic and simple way in pytest. You can just define the parameter with name tick_delay to override the fixture:

@pytest.mark.parametrize(
    ("tick_delay", "expect_included"),  # <-- Note this is now just tick_delay, not tick_delay_override
    [
        pytest.param(timedelta(0), False, id="without-tick-delay"),
        pytest.param(timedelta(milliseconds=200), True, id="with-tick-delay"),
    ],
)
async def test_tick_delay_late_arrival_with_and_without_delay(
    tick_delay_resampler: Resampler,   # <--- Normal resampler fixture here
    ...,
) -> None:
    """Compare late-arrival behavior with and without delayed processing."""
    # No need to call any factory here
    ...

And you can remove this factory.

return MagicMock(spec=ResamplingFunction, return_value=42.0)


# Tick-delay test fixtures: resampler setup
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you remove the factory I would remove this comment, as there is only one fixture for this.

Suggested change
# Tick-delay test fixtures: resampler setup

Comment on lines +470 to +471
tick_delay_resampler = await tick_delay_resampler_factory(tick_delay_override)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tick_delay_resampler = await tick_delay_resampler_factory(tick_delay_override)

Comment on lines +442 to +443
(timedelta(0), False),
(timedelta(milliseconds=200), True),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will give test a more meaningful name.

Suggested change
(timedelta(0), False),
(timedelta(milliseconds=200), True),
pytest.param(timedelta(0), False, id="without-delay"),
pytest.param(timedelta(milliseconds=200), True, id="with-delay"),

(timedelta(milliseconds=200), True),
],
)
async def test_tick_delay_late_arrival_with_and_without_delay(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With good id you can shorten the test name:

Suggested change
async def test_tick_delay_late_arrival_with_and_without_delay(
async def test_tick_delay_late_arrival(

The result is:

tests/timeseries/test_resampling.py::test_tick_delay_late_arrival_with_and_without_delay[tick_delay0-False] PASSED [ 50%]                                                                                                                                                                                                  
tests/timeseries/test_resampling.py::test_tick_delay_late_arrival_with_and_without_delay[tick_delay1-True] PASSED [100%]                                                                                                                                                                                                   

vs

tests/timeseries/test_resampling.py::test_tick_delay_late_arrival[without-delay] PASSED [ 68%]                                                                                                                                                                                                                        
tests/timeseries/test_resampling.py::test_tick_delay_late_arrival[with-delay] PASSED [ 69%]                                                                                                                                                                                                                           

Comment on lines +490 to +502
if expect_included:
resampling_fun_mock.assert_called_once_with(
a_sequence(
as_float_tuple(sample_at_window_start),
as_float_tuple(sample_inside_window),
),
tick_delay_resampler.config,
tick_delay_resampler.get_source_properties(source_receiver),
)
sink_mock.assert_called_once_with(Sample(window_end, Quantity(42.0)))
else:
resampling_fun_mock.assert_not_called()
sink_mock.assert_called_once_with(Sample(window_end, None))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is actually good parametrization. If you need to change the test logic depending on the parameters, it should be a separate test, not a parameter. Ideally the test logic should be very simple, ideally sequential, no branching at all.

resampling_fun_mock.assert_not_called()
sink_mock.assert_called_once_with(Sample(window_end, None))


Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new tests are much better than the original test, but not all the suggested tests were added here. It is still missing the case where a sample timestamp is inside the selected window, but the sample arrives after a non-zero tick_delay has elapsed (it should not be included).

The current zero-delay half of test_tick_delay_late_arrival_with_and_without_delay() tests “without the implementation delay, the late sample is missed”, but it does not test the cutoff behavior for a configured tick_delay=200ms.

I'd suggest adding a new test for that.

But if you split test_tick_delay_late_arrival_with_and_without_delay() ( and I think you should), then it probably will make sense to extend the half using a 200ms delay and the test proposed here.

👇

I suggest the following test structure (click to expand)

I’d use four tests, that avoids parametrization (for different scenarios) and keeps each test single-purpose.

Keeping test_tick_delay_prebuffered_samples_follow_timestamp_boundaries():

Replace the parametrized test with this regression test

async def test_without_tick_delay_late_window_samples_are_missed(
    tick_delay_resampler_factory: Callable[[timedelta], Awaitable[Resampler]],
    source_sender: Sender[Sample[Quantity]],
    sink_mock: AsyncMock,
    resampling_fun_mock: MagicMock,
    window_end: datetime,
    sample_at_window_start: Sample[Quantity],
    sample_inside_window: Sample[Quantity],
) -> None:
    """Regression test: without `tick_delay`, late in-window samples are missed.

    The samples have timestamps inside the selected window, but they arrive
    after the timer tick. With zero delay, processing happens immediately, so
    they are not buffered in time for this resampling run.
    """
    resampler = await tick_delay_resampler_factory(timedelta(0))

    async def send_samples_after_tick() -> None:
        await asyncio.sleep(0.05)

        await source_sender.send(sample_at_window_start)
        await source_sender.send(sample_inside_window)

        await asyncio.sleep(0)

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send_samples_after_tick())
        await resampler.resample(one_shot=True)

    resampling_fun_mock.assert_not_called()
    sink_mock.assert_called_once_with(Sample(window_end, None))

Keep the positive delay case separate

async def test_tick_delay_includes_only_window_samples_arriving_during_delay(
    tick_delay_resampler: Resampler,
    source_receiver: Receiver[Sample[Quantity]],
    source_sender: Sender[Sample[Quantity]],
    sink_mock: AsyncMock,
    resampling_fun_mock: MagicMock,
    window_end: datetime,
    tick_delay: timedelta,
    sample_at_window_start: Sample[Quantity],
    sample_inside_window: Sample[Quantity],
    sample_at_window_end: Sample[Quantity],
    sample_inside_tick_delay: Sample[Quantity],
    sample_at_tick_delay_end: Sample[Quantity],
    sample_after_tick_delay_end: Sample[Quantity],
) -> None:
    """Late arrivals are included only when their timestamps are in-window.

    Samples arrive after the timer tick but before delayed processing happens.
    The in-window samples should be included; timestamps at or after the window
    end should still be excluded.
    """

    async def send_samples_during_tick_delay() -> None:
        await asyncio.sleep((tick_delay / 2).total_seconds())

        await source_sender.send(sample_at_window_start)
        await source_sender.send(sample_inside_window)
        await source_sender.send(sample_at_window_end)
        await source_sender.send(sample_inside_tick_delay)
        await source_sender.send(sample_at_tick_delay_end)
        await source_sender.send(sample_after_tick_delay_end)

        await asyncio.sleep(0)

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send_samples_during_tick_delay())
        await tick_delay_resampler.resample(one_shot=True)

    resampling_fun_mock.assert_called_once_with(
        a_sequence(
            as_float_tuple(sample_at_window_start),
            as_float_tuple(sample_inside_window),
        ),
        tick_delay_resampler.config,
        tick_delay_resampler.get_source_properties(source_receiver),
    )
    sink_mock.assert_called_once_with(Sample(window_end, Quantity(42.0)))

Add the after-delay cutoff test

async def test_tick_delay_excludes_window_samples_arriving_after_delay(
    tick_delay_resampler: Resampler,
    source_sender: Sender[Sample[Quantity]],
    sink_mock: AsyncMock,
    resampling_fun_mock: MagicMock,
    window_end: datetime,
    after_tick_delay: timedelta,
    sample_at_window_start: Sample[Quantity],
    sample_inside_window: Sample[Quantity],
) -> None:
    """In-window samples arriving after tick_delay are not considered.

    The sample timestamps belong to the selected window, but their arrival time
    is after delayed processing has already started.
    """

    async def send_samples_after_tick_delay() -> None:
        await asyncio.sleep(after_tick_delay.total_seconds())

        await source_sender.send(sample_at_window_start)
        await source_sender.send(sample_inside_window)

        await asyncio.sleep(0)

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send_samples_after_tick_delay())
        await tick_delay_resampler.resample(one_shot=True)

    resampling_fun_mock.assert_not_called()
    sink_mock.assert_called_once_with(Sample(window_end, None))

☝️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

part:data-pipeline Affects the data pipeline part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests

Projects

Status: To do

Development

Successfully merging this pull request may close these issues.

2 participants