Skip to content
48 changes: 48 additions & 0 deletions test/asynchronous/test_async_periodic_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2026-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Async-only unit tests for AsyncPeriodicExecutor."""

from __future__ import annotations

import sys

sys.path[0:0] = [""]

from test.asynchronous import AsyncUnitTest, unittest

from pymongo.periodic_executor import AsyncPeriodicExecutor


class TestAsyncPeriodicExecutorExceptions(AsyncUnitTest):
async def test_target_exception_stops_executor(self):
call_count = 0

async def target():
nonlocal call_count
call_count += 1
raise RuntimeError("error")

executor = AsyncPeriodicExecutor(
interval=30.0, min_interval=0.01, target=target, name="test"
)
executor.open()
await executor.join(timeout=2)
if executor._task is not None and executor._task.done():
executor._task.exception()
self.assertEqual(call_count, 1, "target should stop after exception")


if __name__ == "__main__":
unittest.main()
130 changes: 130 additions & 0 deletions test/asynchronous/test_periodic_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Copyright 2026-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Unit tests for periodic_executor.py."""

from __future__ import annotations

import asyncio
import sys
import threading
import time

sys.path[0:0] = [""]

from test.asynchronous import AsyncUnitTest, unittest

from pymongo.periodic_executor import AsyncPeriodicExecutor

_IS_SYNC = False


class TestAsyncPeriodicExecutor(AsyncUnitTest):
def _make_executor(self, interval=30.0, min_interval=0.01, target=None, name="test"):
if target is None:

async def target():
return True

executor = AsyncPeriodicExecutor(
interval=interval, min_interval=min_interval, target=target, name=name
)
self.addAsyncCleanup(self._close_executor, executor)
return executor

async def _close_executor(self, executor):
executor.close()
await executor.join(timeout=2)

async def test_join_without_open_is_safe(self):
executor = self._make_executor()
try:
await executor.join(timeout=0.01)
except Exception as e:
self.fail(f"join() raised unexpected Exception {e}")

async def test_target_returning_false_stops_executor(self):
if _IS_SYNC:
ran = threading.Event()
else:
ran = asyncio.Event()

async def target():
ran.set()
return False

executor = self._make_executor(target=target)
executor.open()
await executor.join(timeout=2)
self.assertTrue(ran.is_set(), "target never ran")

async def test_skip_sleep_flag_skips_interval(self):
call_times = []

async def target():
call_times.append(time.monotonic())
if len(call_times) >= 2:
return False
return True

executor = self._make_executor(interval=30.0, min_interval=0.001, target=target)
executor.skip_sleep()
executor.open()
await executor.join(timeout=3)
self.assertGreaterEqual(len(call_times), 2)
self.assertLess(call_times[1] - call_times[0], 5.0)

async def test_wake_causes_early_run(self):
call_count = 0
if _IS_SYNC:
woken = threading.Event()
else:
woken = asyncio.Event()

async def target():
nonlocal call_count
call_count += 1
if call_count == 1:
woken.set()
return call_count < 2

executor = self._make_executor(interval=30.0, min_interval=0.01, target=target)
executor.open()
if _IS_SYNC:
woken.wait(timeout=2)
else:
assert isinstance(woken, asyncio.Event)
await asyncio.wait_for(woken.wait(), timeout=2)
executor.wake()
await executor.join(timeout=3)
self.assertGreaterEqual(call_count, 2)

async def test_open_after_target_returns_false(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here as above, prefer nonlocal over the list trick.

called = 0

async def target():
nonlocal called
called += 1
return False

executor = self._make_executor(target=target)
executor.open()
await executor.join(timeout=2)
executor.open()
await executor.join(timeout=2)
self.assertGreaterEqual(called, 2)


if __name__ == "__main__":
unittest.main()
130 changes: 130 additions & 0 deletions test/test_periodic_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Copyright 2026-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Unit tests for periodic_executor.py."""

Comment thread
aclark4life marked this conversation as resolved.
from __future__ import annotations

import asyncio
import sys
import threading
import time

sys.path[0:0] = [""]

from test.synchronous import UnitTest, unittest

from pymongo.periodic_executor import PeriodicExecutor

_IS_SYNC = False


class TestAsyncPeriodicExecutor(UnitTest):
def _make_executor(self, interval=30.0, min_interval=0.01, target=None, name="test"):
if target is None:

def target():
return True

executor = PeriodicExecutor(
interval=interval, min_interval=min_interval, target=target, name=name
)
self.addCleanup(self._close_executor, executor)
return executor

def _close_executor(self, executor):
executor.close()
executor.join(timeout=2)

def test_join_without_open_is_safe(self):
executor = self._make_executor()
try:
executor.join(timeout=0.01)
except Exception as e:
self.fail(f"join() raised unexpected Exception {e}")

def test_target_returning_false_stops_executor(self):
if _IS_SYNC:
ran = threading.Event()
else:
ran = asyncio.Event()

def target():
ran.set()
return False

executor = self._make_executor(target=target)
executor.open()
executor.join(timeout=2)
self.assertTrue(ran.is_set(), "target never ran")

Comment on lines +57 to +71
def test_skip_sleep_flag_skips_interval(self):
call_times = []

def target():
call_times.append(time.monotonic())
if len(call_times) >= 2:
return False
return True

executor = self._make_executor(interval=30.0, min_interval=0.001, target=target)
executor.skip_sleep()
executor.open()
executor.join(timeout=3)
self.assertGreaterEqual(len(call_times), 2)
self.assertLess(call_times[1] - call_times[0], 5.0)

def test_wake_causes_early_run(self):
call_count = 0
if _IS_SYNC:
woken = threading.Event()
else:
woken = asyncio.Event()

def target():
nonlocal call_count
call_count += 1
if call_count == 1:
woken.set()
return call_count < 2

executor = self._make_executor(interval=30.0, min_interval=0.01, target=target)
executor.open()
if _IS_SYNC:
woken.wait(timeout=2)
else:
assert isinstance(woken, asyncio.Event)
asyncio.wait_for(woken.wait(), timeout=2)
executor.wake()
executor.join(timeout=3)
self.assertGreaterEqual(call_count, 2)

def test_open_after_target_returns_false(self):
called = 0

def target():
nonlocal called
called += 1
return False

executor = self._make_executor(target=target)
executor.open()
executor.join(timeout=2)
executor.open()
executor.join(timeout=2)
self.assertGreaterEqual(called, 2)


if __name__ == "__main__":
unittest.main()
1 change: 1 addition & 0 deletions tools/synchro.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def async_only_test(f: str) -> bool:
"test_async_loop_safety.py",
"test_async_contextvars_reset.py",
"test_async_loop_unblocked.py",
"test_async_periodic_executor.py",
]


Expand Down
Loading