diff --git a/dapr/actor/__init__.py b/dapr/actor/__init__.py index bf21f488c..7c1e1c1c2 100644 --- a/dapr/actor/__init__.py +++ b/dapr/actor/__init__.py @@ -17,6 +17,7 @@ from dapr.actor.client.proxy import ActorProxy, ActorProxyFactory from dapr.actor.id import ActorId from dapr.actor.runtime.actor import Actor +from dapr.actor.runtime.failure_policy import ActorReminderFailurePolicy from dapr.actor.runtime.remindable import Remindable from dapr.actor.runtime.runtime import ActorRuntime @@ -26,6 +27,7 @@ 'ActorProxyFactory', 'ActorId', 'Actor', + 'ActorReminderFailurePolicy', 'ActorRuntime', 'Remindable', 'actormethod', diff --git a/dapr/actor/runtime/_failure_policy.py b/dapr/actor/runtime/_failure_policy.py new file mode 100644 index 000000000..e650ca469 --- /dev/null +++ b/dapr/actor/runtime/_failure_policy.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2026 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +# Backward-compatible shim — import from the public module instead. +from dapr.actor.runtime.failure_policy import ActorReminderFailurePolicy + +__all__ = ['ActorReminderFailurePolicy'] diff --git a/dapr/actor/runtime/_reminder_data.py b/dapr/actor/runtime/_reminder_data.py index 5453b8162..2eec70e5a 100644 --- a/dapr/actor/runtime/_reminder_data.py +++ b/dapr/actor/runtime/_reminder_data.py @@ -17,6 +17,8 @@ from datetime import timedelta from typing import Any, Dict, Optional +from dapr.actor.runtime.failure_policy import ActorReminderFailurePolicy + class ActorReminderData: """The class that holds actor reminder data. @@ -28,32 +30,37 @@ class ActorReminderData: for the first time. period: the time interval between reminder invocations after the first invocation. + failure_policy: the optional policy for handling reminder failures. """ def __init__( self, reminder_name: str, - state: Optional[bytes], + state: bytes, due_time: timedelta, period: Optional[timedelta] = None, ttl: Optional[timedelta] = None, + failure_policy: Optional[ActorReminderFailurePolicy] = None, ): """Creates new :class:`ActorReminderData` instance. Args: reminder_name (str): the name of Actor reminder. - state (bytes, str): the state data passed to + state (bytes): the state data passed to receive_reminder callback. due_time (datetime.timedelta): the amount of time to delay before invoking the reminder for the first time. period (datetime.timedelta): the time interval between reminder invocations after the first invocation. ttl (Optional[datetime.timedelta]): the time interval before the reminder stops firing. + failure_policy (Optional[ActorReminderFailurePolicy]): the policy for handling + reminder failures. If not set, the Dapr runtime default applies (3 retries). """ self._reminder_name = reminder_name self._due_time = due_time self._period = period self._ttl = ttl + self._failure_policy = failure_policy if not isinstance(state, bytes): raise ValueError(f'only bytes are allowed for state: {type(state)}') @@ -85,11 +92,14 @@ def ttl(self) -> Optional[timedelta]: """Gets ttl of Actor Reminder.""" return self._ttl + @property + def failure_policy(self) -> Optional[ActorReminderFailurePolicy]: + """Gets the failure policy of Actor Reminder.""" + return self._failure_policy + def as_dict(self) -> Dict[str, Any]: """Gets :class:`ActorReminderData` as a dict object.""" - encoded_state = None - if self._state is not None: - encoded_state = base64.b64encode(self._state) + encoded_state = base64.b64encode(self._state) reminderDict: Dict[str, Any] = { 'reminderName': self._reminder_name, 'dueTime': self._due_time, @@ -100,14 +110,18 @@ def as_dict(self) -> Dict[str, Any]: if self._ttl is not None: reminderDict.update({'ttl': self._ttl}) + if self._failure_policy is not None: + reminderDict.update({'failurePolicy': self._failure_policy.as_dict()}) + return reminderDict @classmethod def from_dict(cls, reminder_name: str, obj: Dict[str, Any]) -> 'ActorReminderData': """Creates :class:`ActorReminderData` object from dict object.""" b64encoded_state = obj.get('data') - state_bytes = None - if b64encoded_state is not None and len(b64encoded_state) > 0: + if b64encoded_state is None or len(b64encoded_state) == 0: + state_bytes = b'' + else: state_bytes = base64.b64decode(b64encoded_state) if 'ttl' in obj: return ActorReminderData( diff --git a/dapr/actor/runtime/actor.py b/dapr/actor/runtime/actor.py index fab02fc70..a666b06e1 100644 --- a/dapr/actor/runtime/actor.py +++ b/dapr/actor/runtime/actor.py @@ -22,6 +22,7 @@ from dapr.actor.runtime._reminder_data import ActorReminderData from dapr.actor.runtime._timer_data import TIMER_CALLBACK, ActorTimerData from dapr.actor.runtime.context import ActorRuntimeContext +from dapr.actor.runtime.failure_policy import ActorReminderFailurePolicy from dapr.actor.runtime.state_manager import ActorStateManager @@ -113,6 +114,7 @@ async def register_reminder( due_time: timedelta, period: Optional[timedelta] = None, ttl: Optional[timedelta] = None, + failure_policy: Optional[ActorReminderFailurePolicy] = None, ) -> None: """Registers actor reminder. @@ -129,11 +131,16 @@ async def register_reminder( state (bytes): the user state passed to the reminder invocation. due_time (datetime.timedelta): the amount of time to delay before invoking the reminder for the first time. - period (datetime.timedelta): the time interval between reminder invocations after - the first invocation. - ttl (datetime.timedelta): the time interval before the reminder stops firing + period (Optional[datetime.timedelta]): the optional time interval between reminder + invocations after the first invocation. If not set, the Dapr runtime behavior + for one-off or non-periodic reminders applies. + ttl (Optional[datetime.timedelta]): the optional time interval before the reminder + stops firing. If not set, the Dapr runtime default behavior applies. + failure_policy (Optional[ActorReminderFailurePolicy]): the optional policy for + handling reminder failures. If not set, the Dapr runtime default applies + (3 retries per tick). """ - reminder = ActorReminderData(name, state, due_time, period, ttl) + reminder = ActorReminderData(name, state, due_time, period, ttl, failure_policy) req_body = self._runtime_ctx.message_serializer.serialize(reminder.as_dict()) await self._runtime_ctx.dapr_client.register_reminder( self._runtime_ctx.actor_type_info.type_name, self.id.id, name, req_body diff --git a/dapr/actor/runtime/failure_policy.py b/dapr/actor/runtime/failure_policy.py new file mode 100644 index 000000000..3b4576ac4 --- /dev/null +++ b/dapr/actor/runtime/failure_policy.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2026 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from datetime import timedelta +from typing import Any, Dict, Optional + + +class ActorReminderFailurePolicy: + """Defines what happens when an actor reminder fails to trigger. + + Use :meth:`drop_policy` to discard failed ticks without retrying, or + :meth:`constant_policy` to retry at a fixed interval. + + Attributes: + drop: whether this is a drop (no-retry) policy. + interval: the retry interval for a constant policy. + max_retries: the maximum number of retries for a constant policy. + """ + + def __init__( + self, + *, + drop: bool = False, + interval: Optional[timedelta] = None, + max_retries: Optional[int] = None, + ): + """Creates a new :class:`ActorReminderFailurePolicy` instance. + + Args: + drop (bool): if True, creates a drop policy that discards the reminder + tick on failure without retrying. Cannot be combined with interval + or max_retries. + interval (datetime.timedelta): the retry interval for a constant policy. + max_retries (int): the maximum number of retries for a constant policy. + If not set, retries indefinitely. + + Raises: + ValueError: if drop is combined with interval or max_retries, or if + neither drop=True nor at least one of interval/max_retries is provided. + """ + if drop and (interval is not None or max_retries is not None): + raise ValueError('drop policy cannot be combined with interval or max_retries') + if not drop and interval is None and max_retries is None: + raise ValueError('specify either drop=True or at least one of interval or max_retries') + self._drop = drop + self._interval = interval + self._max_retries = max_retries + + @classmethod + def drop_policy(cls) -> 'ActorReminderFailurePolicy': + """Returns a policy that drops the reminder tick on failure (no retry).""" + return cls(drop=True) + + @classmethod + def constant_policy( + cls, + interval: Optional[timedelta] = None, + max_retries: Optional[int] = None, + ) -> 'ActorReminderFailurePolicy': + """Returns a policy that retries at a constant interval on failure. + + Args: + interval (datetime.timedelta): the time between retry attempts. + max_retries (int): the maximum number of retry attempts. If not set, + retries indefinitely. + """ + return cls(interval=interval, max_retries=max_retries) + + @property + def drop(self) -> bool: + """Returns True if this is a drop policy.""" + return self._drop + + @property + def interval(self) -> Optional[timedelta]: + """Returns the retry interval for a constant policy.""" + return self._interval + + @property + def max_retries(self) -> Optional[int]: + """Returns the maximum retries for a constant policy.""" + return self._max_retries + + def as_dict(self) -> Dict[str, Any]: + """Gets :class:`ActorReminderFailurePolicy` as a dict object.""" + if self._drop: + return {'drop': {}} + d: Dict[str, Any] = {} + if self._interval is not None: + d['interval'] = self._interval + if self._max_retries is not None: + d['maxRetries'] = self._max_retries + return {'constant': d} diff --git a/dapr/actor/runtime/mock_actor.py b/dapr/actor/runtime/mock_actor.py index 82170672e..28c48b858 100644 --- a/dapr/actor/runtime/mock_actor.py +++ b/dapr/actor/runtime/mock_actor.py @@ -20,6 +20,7 @@ from dapr.actor.runtime._reminder_data import ActorReminderData from dapr.actor.runtime._timer_data import TIMER_CALLBACK, ActorTimerData from dapr.actor.runtime.actor import Actor +from dapr.actor.runtime.failure_policy import ActorReminderFailurePolicy from dapr.actor.runtime.mock_state_manager import MockStateManager @@ -88,6 +89,7 @@ async def register_reminder( due_time: timedelta, period: Optional[timedelta] = None, ttl: Optional[timedelta] = None, + failure_policy: Optional[ActorReminderFailurePolicy] = None, ) -> None: """Adds actor reminder to self._state_manager._mock_reminders. @@ -96,11 +98,16 @@ async def register_reminder( state (bytes): the user state passed to the reminder invocation. due_time (datetime.timedelta): the amount of time to delay before invoking the reminder for the first time. - period (datetime.timedelta): the time interval between reminder invocations after - the first invocation. - ttl (datetime.timedelta): the time interval before the reminder stops firing + period (Optional[datetime.timedelta]): the optional time interval between reminder + invocations after the first invocation. If None, the reminder uses the Dapr + runtime behavior for one-off or non-periodic reminders. + ttl (Optional[datetime.timedelta]): the optional time interval before the reminder + stops firing. If None, no explicit TTL is set. + failure_policy (Optional[ActorReminderFailurePolicy]): the optional policy for + handling reminder failures. If not set, the Dapr runtime default applies + (3 retries per tick). """ - reminder = ActorReminderData(name, state, due_time, period, ttl) + reminder = ActorReminderData(name, state, due_time, period, ttl, failure_policy) self._state_manager._mock_reminders[name] = reminder # type: ignore async def unregister_reminder(self, name: str) -> None: diff --git a/tests/actor/test_actor.py b/tests/actor/test_actor.py index 7a7bee2d2..1110fd146 100644 --- a/tests/actor/test_actor.py +++ b/tests/actor/test_actor.py @@ -21,6 +21,7 @@ from dapr.actor.runtime._type_information import ActorTypeInformation from dapr.actor.runtime.config import ActorRuntimeConfig from dapr.actor.runtime.context import ActorRuntimeContext +from dapr.actor.runtime.failure_policy import ActorReminderFailurePolicy from dapr.actor.runtime.runtime import ActorRuntime from dapr.conf import settings from dapr.serializers import DefaultJSONSerializer @@ -151,6 +152,65 @@ def test_register_reminder(self): 'FakeSimpleReminderActor', 'test_id', 'test_reminder' ) + @mock.patch( + 'tests.actor.fake_client.FakeDaprActorClient.register_reminder', + new=_async_mock(return_value=b'"ok"'), + ) + def test_register_reminder_with_failure_policy(self): + test_actor_id = ActorId('test_id') + test_type_info = ActorTypeInformation.create(FakeSimpleReminderActor) + test_client = FakeDaprActorClient + ctx = ActorRuntimeContext(test_type_info, self._serializer, self._serializer, test_client) + test_actor = FakeSimpleReminderActor(ctx, test_actor_id) + + _run( + test_actor.register_reminder( + 'test_reminder', + b'reminder_message', + timedelta(seconds=1), + timedelta(seconds=1), + failure_policy=ActorReminderFailurePolicy.drop_policy(), + ) + ) + test_client.register_reminder.mock.assert_called_once() + test_client.register_reminder.mock.assert_called_with( + 'FakeSimpleReminderActor', + 'test_id', + 'test_reminder', + b'{"reminderName":"test_reminder","dueTime":"0h0m1s0ms0\\u03bcs","period":"0h0m1s0ms0\\u03bcs","data":"cmVtaW5kZXJfbWVzc2FnZQ==","failurePolicy":{"drop":{}}}', # noqa E501 + ) + + @mock.patch( + 'tests.actor.fake_client.FakeDaprActorClient.register_reminder', + new=_async_mock(return_value=b'"ok"'), + ) + def test_register_reminder_with_constant_failure_policy(self): + test_actor_id = ActorId('test_id') + test_type_info = ActorTypeInformation.create(FakeSimpleReminderActor) + test_client = FakeDaprActorClient + ctx = ActorRuntimeContext(test_type_info, self._serializer, self._serializer, test_client) + test_actor = FakeSimpleReminderActor(ctx, test_actor_id) + + _run( + test_actor.register_reminder( + 'test_reminder', + b'reminder_message', + timedelta(seconds=1), + timedelta(seconds=1), + failure_policy=ActorReminderFailurePolicy.constant_policy( + interval=timedelta(seconds=2), + max_retries=5, + ), + ) + ) + test_client.register_reminder.mock.assert_called_once() + test_client.register_reminder.mock.assert_called_with( + 'FakeSimpleReminderActor', + 'test_id', + 'test_reminder', + b'{"reminderName":"test_reminder","dueTime":"0h0m1s0ms0\\u03bcs","period":"0h0m1s0ms0\\u03bcs","data":"cmVtaW5kZXJfbWVzc2FnZQ==","failurePolicy":{"constant":{"interval":"0h0m2s0ms0\\u03bcs","maxRetries":5}}}', # noqa E501 + ) + @mock.patch( 'tests.actor.fake_client.FakeDaprActorClient.register_timer', new=_async_mock(return_value=b'"ok"'), diff --git a/tests/actor/test_failure_policy.py b/tests/actor/test_failure_policy.py new file mode 100644 index 000000000..12db1c4b8 --- /dev/null +++ b/tests/actor/test_failure_policy.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2026 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import unittest +from datetime import timedelta + +from dapr.actor.runtime.failure_policy import ActorReminderFailurePolicy + + +class ActorReminderFailurePolicyTests(unittest.TestCase): + # --- drop_policy --- + + def test_drop_policy_factory(self): + p = ActorReminderFailurePolicy.drop_policy() + self.assertTrue(p.drop) + self.assertIsNone(p.interval) + self.assertIsNone(p.max_retries) + + def test_drop_policy_as_dict(self): + p = ActorReminderFailurePolicy.drop_policy() + self.assertEqual({'drop': {}}, p.as_dict()) + + # --- constant_policy --- + + def test_constant_policy_interval_and_max_retries(self): + p = ActorReminderFailurePolicy.constant_policy(interval=timedelta(seconds=5), max_retries=3) + self.assertFalse(p.drop) + self.assertEqual(timedelta(seconds=5), p.interval) + self.assertEqual(3, p.max_retries) + + def test_constant_policy_as_dict_full(self): + p = ActorReminderFailurePolicy.constant_policy(interval=timedelta(seconds=5), max_retries=3) + self.assertEqual( + {'constant': {'interval': timedelta(seconds=5), 'maxRetries': 3}}, p.as_dict() + ) + + def test_constant_policy_interval_only(self): + p = ActorReminderFailurePolicy.constant_policy(interval=timedelta(seconds=10)) + self.assertEqual({'constant': {'interval': timedelta(seconds=10)}}, p.as_dict()) + + def test_constant_policy_max_retries_only(self): + p = ActorReminderFailurePolicy.constant_policy(max_retries=5) + self.assertEqual({'constant': {'maxRetries': 5}}, p.as_dict()) + + # --- validation errors --- + + def test_drop_with_interval_raises(self): + with self.assertRaises(ValueError): + ActorReminderFailurePolicy(drop=True, interval=timedelta(seconds=1)) + + def test_drop_with_max_retries_raises(self): + with self.assertRaises(ValueError): + ActorReminderFailurePolicy(drop=True, max_retries=3) + + def test_drop_with_both_raises(self): + with self.assertRaises(ValueError): + ActorReminderFailurePolicy(drop=True, interval=timedelta(seconds=1), max_retries=3) + + def test_no_policy_specified_raises(self): + with self.assertRaises(ValueError): + ActorReminderFailurePolicy() + + # --- direct constructor --- + + def test_direct_drop_constructor(self): + p = ActorReminderFailurePolicy(drop=True) + self.assertEqual({'drop': {}}, p.as_dict()) + + def test_direct_constant_constructor(self): + p = ActorReminderFailurePolicy(interval=timedelta(seconds=2), max_retries=1) + self.assertEqual( + {'constant': {'interval': timedelta(seconds=2), 'maxRetries': 1}}, p.as_dict() + ) diff --git a/tests/actor/test_reminder_data.py b/tests/actor/test_reminder_data.py index e142217c9..9da42795b 100644 --- a/tests/actor/test_reminder_data.py +++ b/tests/actor/test_reminder_data.py @@ -17,6 +17,7 @@ from datetime import timedelta from dapr.actor.runtime._reminder_data import ActorReminderData +from dapr.actor.runtime.failure_policy import ActorReminderFailurePolicy class ActorReminderTests(unittest.TestCase): @@ -80,3 +81,60 @@ def test_from_dict(self): self.assertEqual(timedelta(seconds=2), reminder.period) self.assertEqual(timedelta(seconds=3), reminder.ttl) self.assertEqual(b'reminder_state', reminder.state) + + def test_no_failure_policy(self): + reminder = ActorReminderData( + 'test_reminder', + b'reminder_state', + timedelta(seconds=1), + timedelta(seconds=2), + ) + result = reminder.as_dict() + self.assertNotIn('failurePolicy', result) + self.assertIsNone(reminder.failure_policy) + + def test_drop_failure_policy_as_dict(self): + policy = ActorReminderFailurePolicy.drop_policy() + reminder = ActorReminderData( + 'test_reminder', + b'reminder_state', + timedelta(seconds=1), + timedelta(seconds=2), + failure_policy=policy, + ) + result = reminder.as_dict() + self.assertIn('failurePolicy', result) + self.assertEqual({'drop': {}}, result['failurePolicy']) + + def test_constant_failure_policy_as_dict(self): + policy = ActorReminderFailurePolicy.constant_policy( + interval=timedelta(seconds=5), max_retries=3 + ) + reminder = ActorReminderData( + 'test_reminder', + b'reminder_state', + timedelta(seconds=1), + timedelta(seconds=2), + failure_policy=policy, + ) + result = reminder.as_dict() + self.assertIn('failurePolicy', result) + self.assertEqual( + {'constant': {'interval': timedelta(seconds=5), 'maxRetries': 3}}, + result['failurePolicy'], + ) + + def test_failure_policy_alongside_ttl(self): + policy = ActorReminderFailurePolicy.drop_policy() + reminder = ActorReminderData( + 'test_reminder', + b'reminder_state', + timedelta(seconds=1), + timedelta(seconds=2), + ttl=timedelta(seconds=60), + failure_policy=policy, + ) + result = reminder.as_dict() + self.assertIn('ttl', result) + self.assertIn('failurePolicy', result) + self.assertEqual({'drop': {}}, result['failurePolicy'])