diff --git a/test/asynchronous/test_async_periodic_executor.py b/test/asynchronous/test_async_periodic_executor.py new file mode 100644 index 0000000000..f3820be37a --- /dev/null +++ b/test/asynchronous/test_async_periodic_executor.py @@ -0,0 +1,48 @@ +# Copyright 2026-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Async-only unit tests for AsyncPeriodicExecutor.""" + +from __future__ import annotations + +import sys + +sys.path[0:0] = [""] + +from test.asynchronous import AsyncUnitTest, unittest + +from pymongo.periodic_executor import AsyncPeriodicExecutor + + +class TestAsyncPeriodicExecutorExceptions(AsyncUnitTest): + async def test_target_exception_stops_executor(self): + call_count = 0 + + async def target(): + nonlocal call_count + call_count += 1 + raise RuntimeError("error") + + executor = AsyncPeriodicExecutor( + interval=30.0, min_interval=0.01, target=target, name="test" + ) + executor.open() + await executor.join(timeout=2) + if executor._task is not None and executor._task.done(): + executor._task.exception() + self.assertEqual(call_count, 1, "target should stop after exception") + + +if __name__ == "__main__": + unittest.main() diff --git a/test/asynchronous/test_periodic_executor.py b/test/asynchronous/test_periodic_executor.py new file mode 100644 index 0000000000..e56a9f111c --- /dev/null +++ b/test/asynchronous/test_periodic_executor.py @@ -0,0 +1,130 @@ +# Copyright 2026-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for periodic_executor.py.""" + +from __future__ import annotations + +import asyncio +import sys +import threading +import time + +sys.path[0:0] = [""] + +from test.asynchronous import AsyncUnitTest, unittest + +from pymongo.periodic_executor import AsyncPeriodicExecutor + +_IS_SYNC = False + + +class TestAsyncPeriodicExecutor(AsyncUnitTest): + def _make_executor(self, interval=30.0, min_interval=0.01, target=None, name="test"): + if target is None: + + async def target(): + return True + + executor = AsyncPeriodicExecutor( + interval=interval, min_interval=min_interval, target=target, name=name + ) + self.addAsyncCleanup(self._close_executor, executor) + return executor + + async def _close_executor(self, executor): + executor.close() + await executor.join(timeout=2) + + async def test_join_without_open_is_safe(self): + executor = self._make_executor() + try: + await executor.join(timeout=0.01) + except Exception as e: + self.fail(f"join() raised unexpected Exception {e}") + + async def test_target_returning_false_stops_executor(self): + if _IS_SYNC: + ran = threading.Event() + else: + ran = asyncio.Event() + + async def target(): + ran.set() + return False + + executor = self._make_executor(target=target) + executor.open() + await executor.join(timeout=2) + self.assertTrue(ran.is_set(), "target never ran") + + async def test_skip_sleep_flag_skips_interval(self): + call_times = [] + + async def target(): + call_times.append(time.monotonic()) + if len(call_times) >= 2: + return False + return True + + executor = self._make_executor(interval=30.0, min_interval=0.001, target=target) + executor.skip_sleep() + executor.open() + await executor.join(timeout=3) + self.assertGreaterEqual(len(call_times), 2) + self.assertLess(call_times[1] - call_times[0], 5.0) + + async def test_wake_causes_early_run(self): + call_count = 0 + if _IS_SYNC: + woken = threading.Event() + else: + woken = asyncio.Event() + + async def target(): + nonlocal call_count + call_count += 1 + if call_count == 1: + woken.set() + return call_count < 2 + + executor = self._make_executor(interval=30.0, min_interval=0.01, target=target) + executor.open() + if _IS_SYNC: + woken.wait(timeout=2) + else: + assert isinstance(woken, asyncio.Event) + await asyncio.wait_for(woken.wait(), timeout=2) + executor.wake() + await executor.join(timeout=3) + self.assertGreaterEqual(call_count, 2) + + async def test_open_after_target_returns_false(self): + called = 0 + + async def target(): + nonlocal called + called += 1 + return False + + executor = self._make_executor(target=target) + executor.open() + await executor.join(timeout=2) + executor.open() + await executor.join(timeout=2) + self.assertGreaterEqual(called, 2) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_periodic_executor.py b/test/test_periodic_executor.py new file mode 100644 index 0000000000..cb572302a9 --- /dev/null +++ b/test/test_periodic_executor.py @@ -0,0 +1,130 @@ +# Copyright 2026-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for periodic_executor.py.""" + +from __future__ import annotations + +import asyncio +import sys +import threading +import time + +sys.path[0:0] = [""] + +from test.synchronous import UnitTest, unittest + +from pymongo.periodic_executor import PeriodicExecutor + +_IS_SYNC = False + + +class TestAsyncPeriodicExecutor(UnitTest): + def _make_executor(self, interval=30.0, min_interval=0.01, target=None, name="test"): + if target is None: + + def target(): + return True + + executor = PeriodicExecutor( + interval=interval, min_interval=min_interval, target=target, name=name + ) + self.addCleanup(self._close_executor, executor) + return executor + + def _close_executor(self, executor): + executor.close() + executor.join(timeout=2) + + def test_join_without_open_is_safe(self): + executor = self._make_executor() + try: + executor.join(timeout=0.01) + except Exception as e: + self.fail(f"join() raised unexpected Exception {e}") + + def test_target_returning_false_stops_executor(self): + if _IS_SYNC: + ran = threading.Event() + else: + ran = asyncio.Event() + + def target(): + ran.set() + return False + + executor = self._make_executor(target=target) + executor.open() + executor.join(timeout=2) + self.assertTrue(ran.is_set(), "target never ran") + + def test_skip_sleep_flag_skips_interval(self): + call_times = [] + + def target(): + call_times.append(time.monotonic()) + if len(call_times) >= 2: + return False + return True + + executor = self._make_executor(interval=30.0, min_interval=0.001, target=target) + executor.skip_sleep() + executor.open() + executor.join(timeout=3) + self.assertGreaterEqual(len(call_times), 2) + self.assertLess(call_times[1] - call_times[0], 5.0) + + def test_wake_causes_early_run(self): + call_count = 0 + if _IS_SYNC: + woken = threading.Event() + else: + woken = asyncio.Event() + + def target(): + nonlocal call_count + call_count += 1 + if call_count == 1: + woken.set() + return call_count < 2 + + executor = self._make_executor(interval=30.0, min_interval=0.01, target=target) + executor.open() + if _IS_SYNC: + woken.wait(timeout=2) + else: + assert isinstance(woken, asyncio.Event) + asyncio.wait_for(woken.wait(), timeout=2) + executor.wake() + executor.join(timeout=3) + self.assertGreaterEqual(call_count, 2) + + def test_open_after_target_returns_false(self): + called = 0 + + def target(): + nonlocal called + called += 1 + return False + + executor = self._make_executor(target=target) + executor.open() + executor.join(timeout=2) + executor.open() + executor.join(timeout=2) + self.assertGreaterEqual(called, 2) + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/synchro.py b/tools/synchro.py index ed794c5963..c00827885d 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -190,6 +190,7 @@ def async_only_test(f: str) -> bool: "test_async_loop_safety.py", "test_async_contextvars_reset.py", "test_async_loop_unblocked.py", + "test_async_periodic_executor.py", ]