Skip to content

Commit 387c712

Browse files
authored
fix: Fix Snapshotter handling of out of order samples (#1735)
### Description: - Update `test_snapshot_pruning_removes_outdated_records` to remove the expected source of flaky behavior - The root cause of the flaky test behavior is a design flaw in snapshot handling: `event_manager.on(event=Event.SYSTEM_INFO, listener=self._snapshot_cpu)` adds `self._snapshot_cpu` listener, but event manager runs sync listeners through `asyncio.to_thread`. `self._snapshot_cpu` modifies in-place instance list (for example `self._cpu_snapshots`) - it does `bisect.insort` and `del` operations on the same list from several threads, which creates oportunity for a race condition. - Changed order of operations. The new `Snapshot` is now added first, and the pruning of the snapshot list is done based on `Snapshot` with the newest date in the list (the newest date does not have to be the last added Snapshot). This fixes the bug when the last out-of-order `Snapshot` could cause wrong pruning. ### Issues: Closes: #1734
1 parent a0b2937 commit 387c712

2 files changed

Lines changed: 38 additions & 24 deletions

File tree

src/crawlee/_autoscaling/snapshotter.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
from __future__ import annotations
44

5-
import bisect
65
import functools
6+
from bisect import insort
77
from datetime import datetime, timedelta, timezone
88
from logging import getLogger
99
from typing import TYPE_CHECKING, TypeVar, cast
@@ -38,7 +38,7 @@ class SortedSnapshotList(list[T]):
3838

3939
def add(self, item: T) -> None:
4040
"""Add an item to the list maintaining sorted order by `created_at` using binary search."""
41-
bisect.insort(self, item, key=lambda item: item.created_at)
41+
insort(self, item, key=lambda item: item.created_at)
4242

4343

4444
@docs_group('Autoscaling')
@@ -261,11 +261,13 @@ def _get_sample(snapshots: list[Snapshot], duration: timedelta | None = None) ->
261261
latest_time = snapshots[-1].created_at
262262
return [snapshot for snapshot in snapshots if latest_time - snapshot.created_at <= duration]
263263

264-
def _snapshot_cpu(self, event_data: EventSystemInfoData) -> None:
264+
async def _snapshot_cpu(self, event_data: EventSystemInfoData) -> None:
265265
"""Capture a snapshot of the current CPU usage.
266266
267267
This method does not perform CPU usage measurement. Instead, it just reads the data received through
268268
the `event_data` parameter, which is expected to be supplied by the event manager.
269+
Must be `async` to ensure it is not scheduled to be run in own thread by the event manager, which could cause
270+
race conditions in snapshots manipulation(sorting and pruning).
269271
270272
Args:
271273
event_data: System info data from which CPU usage is read.
@@ -277,14 +279,16 @@ def _snapshot_cpu(self, event_data: EventSystemInfoData) -> None:
277279
)
278280

279281
snapshots = cast('list[Snapshot]', self._cpu_snapshots)
280-
self._prune_snapshots(snapshots, event_data.cpu_info.created_at)
281282
self._cpu_snapshots.add(snapshot)
283+
self._prune_snapshots(snapshots, self._cpu_snapshots[-1].created_at)
282284

283-
def _snapshot_memory(self, event_data: EventSystemInfoData) -> None:
285+
async def _snapshot_memory(self, event_data: EventSystemInfoData) -> None:
284286
"""Capture a snapshot of the current memory usage.
285287
286288
This method does not perform memory usage measurement. Instead, it just reads the data received through
287289
the `event_data` parameter, which is expected to be supplied by the event manager.
290+
Must be `async` to ensure it is not scheduled to be run in own thread by the event manager, which could cause
291+
race conditions in snapshots manipulation(sorting and pruning).
288292
289293
Args:
290294
event_data: System info data from which memory usage is read.
@@ -330,22 +334,24 @@ def _snapshot_memory(self, event_data: EventSystemInfoData) -> None:
330334
)
331335

332336
snapshots = cast('list[Snapshot]', self._memory_snapshots)
333-
self._prune_snapshots(snapshots, snapshot.created_at)
334337
self._memory_snapshots.add(snapshot)
338+
self._prune_snapshots(snapshots, self._memory_snapshots[-1].created_at)
335339

336340
self._evaluate_memory_load(
337341
event_data.memory_info.current_size,
338342
event_data.memory_info.created_at,
339343
max_memory_size=max_memory_size,
340344
)
341345

342-
def _snapshot_event_loop(self) -> None:
346+
async def _snapshot_event_loop(self) -> None:
343347
"""Capture a snapshot of the current event loop usage.
344348
345349
This method evaluates the event loop's latency by comparing the expected time between snapshots to the actual
346350
time elapsed since the last snapshot. The delay in the snapshot reflects the time deviation due to event loop
347351
overhead - it's calculated by subtracting the expected interval between snapshots from the actual time elapsed
348352
since the last snapshot. If there's no previous snapshot, the delay is considered zero.
353+
Must be `async` to ensure it is not scheduled to be run in own thread by the event manager, which could cause
354+
race conditions in snapshots manipulation(sorting and pruning).
349355
"""
350356
snapshot = EventLoopSnapshot(max_delay=self._max_event_loop_delay, delay=timedelta(seconds=0))
351357
previous_snapshot = self._event_loop_snapshots[-1] if self._event_loop_snapshots else None
@@ -355,14 +361,16 @@ def _snapshot_event_loop(self) -> None:
355361
snapshot.delay = event_loop_delay
356362

357363
snapshots = cast('list[Snapshot]', self._event_loop_snapshots)
358-
self._prune_snapshots(snapshots, snapshot.created_at)
359364
self._event_loop_snapshots.add(snapshot)
365+
self._prune_snapshots(snapshots, self._event_loop_snapshots[-1].created_at)
360366

361-
def _snapshot_client(self) -> None:
367+
async def _snapshot_client(self) -> None:
362368
"""Capture a snapshot of the current API state by checking for rate limit errors (HTTP 429).
363369
364370
Only errors produced by a 2nd retry of the API call are considered for snapshotting since earlier errors may
365371
just be caused by a random spike in the number of requests and do not necessarily signify API overloading.
372+
Must be `async` to ensure it is not scheduled to be run in own thread by the event manager, which could cause
373+
race conditions in snapshots manipulation(sorting and pruning).
366374
"""
367375
client = service_locator.get_storage_client()
368376

@@ -377,8 +385,8 @@ def _snapshot_client(self) -> None:
377385
)
378386

379387
snapshots = cast('list[Snapshot]', self._client_snapshots)
380-
self._prune_snapshots(snapshots, snapshot.created_at)
381388
self._client_snapshots.add(snapshot)
389+
self._prune_snapshots(snapshots, self._client_snapshots[-1].created_at)
382390

383391
def _prune_snapshots(self, snapshots: list[Snapshot], now: datetime) -> None:
384392
"""Remove snapshots that are older than the `self._snapshot_history`.

tests/unit/_autoscaling/test_snapshotter.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import time
5+
from bisect import insort
46
from datetime import datetime, timedelta, timezone
57
from logging import getLogger
68
from math import floor
7-
from typing import TYPE_CHECKING, cast
9+
from typing import TYPE_CHECKING, Any, cast
10+
from unittest import mock
811
from unittest.mock import MagicMock
912

1013
import pytest
@@ -219,9 +222,6 @@ async def test_methods_raise_error_when_not_active() -> None:
219222
assert snapshotter.active is True
220223

221224

222-
@pytest.mark.skip(
223-
reason='Flaky due to snapshot pruning boundary condition, see https://github.com/apify/crawlee-python/issues/1734'
224-
)
225225
async def test_snapshot_pruning_removes_outdated_records(
226226
snapshotter: Snapshotter, event_manager: LocalEventManager, default_memory_info: MemoryInfo
227227
) -> None:
@@ -231,17 +231,23 @@ async def test_snapshot_pruning_removes_outdated_records(
231231
# Create timestamps for testing
232232
now = datetime.now(timezone.utc)
233233

234-
events_data = [
235-
EventSystemInfoData(
236-
cpu_info=CpuInfo(used_ratio=0.5, created_at=now - timedelta(hours=delta)),
237-
memory_info=default_memory_info,
238-
)
239-
for delta in [5, 3, 2, 0]
240-
]
234+
def randomly_delayed_insort(*args: Any, **kwargs: Any) -> None:
235+
"""Sort with injected delay to provoke otherwise hard to reproduce race condition."""
236+
time.sleep(0.05)
237+
return insort(*args, **kwargs)
241238

242-
for event_data in events_data:
243-
event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_data)
244-
await event_manager.wait_for_all_listeners_to_complete()
239+
with mock.patch('crawlee._autoscaling.snapshotter.insort', side_effect=randomly_delayed_insort):
240+
events_data = [
241+
EventSystemInfoData(
242+
cpu_info=CpuInfo(used_ratio=0.5, created_at=now - timedelta(hours=delta)),
243+
memory_info=default_memory_info,
244+
)
245+
for delta in [0, 3, 2, 5] # Out of order timestamps. Snapshotter can not rely on natural ordering.
246+
]
247+
248+
for event_data in events_data:
249+
event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_data)
250+
await event_manager.wait_for_all_listeners_to_complete()
245251

246252
cpu_snapshots = cast('list[CpuSnapshot]', snapshotter.get_cpu_sample())
247253

0 commit comments

Comments
 (0)