Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dapr/actor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dapr.actor.client.proxy import ActorProxy, ActorProxyFactory
from dapr.actor.id import ActorId
from dapr.actor.runtime.actor import Actor
from dapr.actor.runtime.failure_policy import ActorReminderFailurePolicy
from dapr.actor.runtime.remindable import Remindable
from dapr.actor.runtime.runtime import ActorRuntime

Expand All @@ -26,6 +27,7 @@
'ActorProxyFactory',
'ActorId',
'Actor',
'ActorReminderFailurePolicy',
'ActorRuntime',
'Remindable',
'actormethod',
Expand Down
19 changes: 19 additions & 0 deletions dapr/actor/runtime/_failure_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-

"""
Copyright 2026 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

# Backward-compatible shim — import from the public module instead.
from dapr.actor.runtime.failure_policy import ActorReminderFailurePolicy

__all__ = ['ActorReminderFailurePolicy']
28 changes: 21 additions & 7 deletions dapr/actor/runtime/_reminder_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from datetime import timedelta
from typing import Any, Dict, Optional

from dapr.actor.runtime.failure_policy import ActorReminderFailurePolicy


class ActorReminderData:
"""The class that holds actor reminder data.
Expand All @@ -28,32 +30,37 @@ class ActorReminderData:
for the first time.
period: the time interval between reminder invocations after
the first invocation.
failure_policy: the optional policy for handling reminder failures.
"""

def __init__(
self,
reminder_name: str,
state: Optional[bytes],
state: bytes,
due_time: timedelta,
period: Optional[timedelta] = None,
ttl: Optional[timedelta] = None,
failure_policy: Optional[ActorReminderFailurePolicy] = None,
):
"""Creates new :class:`ActorReminderData` instance.

Args:
reminder_name (str): the name of Actor reminder.
state (bytes, str): the state data passed to
state (bytes): the state data passed to
receive_reminder callback.
due_time (datetime.timedelta): the amount of time to delay before
invoking the reminder for the first time.
period (datetime.timedelta): the time interval between reminder
invocations after the first invocation.
ttl (Optional[datetime.timedelta]): the time interval before the reminder stops firing.
failure_policy (Optional[ActorReminderFailurePolicy]): the policy for handling
reminder failures. If not set, the Dapr runtime default applies (3 retries).
"""
self._reminder_name = reminder_name
self._due_time = due_time
self._period = period
self._ttl = ttl
self._failure_policy = failure_policy

if not isinstance(state, bytes):
raise ValueError(f'only bytes are allowed for state: {type(state)}')
Expand Down Expand Up @@ -85,11 +92,14 @@ def ttl(self) -> Optional[timedelta]:
"""Gets ttl of Actor Reminder."""
return self._ttl

@property
def failure_policy(self) -> Optional[ActorReminderFailurePolicy]:
"""Gets the failure policy of Actor Reminder."""
return self._failure_policy

def as_dict(self) -> Dict[str, Any]:
"""Gets :class:`ActorReminderData` as a dict object."""
encoded_state = None
if self._state is not None:
encoded_state = base64.b64encode(self._state)
encoded_state = base64.b64encode(self._state)
reminderDict: Dict[str, Any] = {
'reminderName': self._reminder_name,
'dueTime': self._due_time,
Expand All @@ -100,14 +110,18 @@ def as_dict(self) -> Dict[str, Any]:
if self._ttl is not None:
reminderDict.update({'ttl': self._ttl})

if self._failure_policy is not None:
reminderDict.update({'failurePolicy': self._failure_policy.as_dict()})

return reminderDict

@classmethod
def from_dict(cls, reminder_name: str, obj: Dict[str, Any]) -> 'ActorReminderData':
"""Creates :class:`ActorReminderData` object from dict object."""
b64encoded_state = obj.get('data')
state_bytes = None
if b64encoded_state is not None and len(b64encoded_state) > 0:
if b64encoded_state is None or len(b64encoded_state) == 0:
state_bytes = b''
else:
state_bytes = base64.b64decode(b64encoded_state)
if 'ttl' in obj:
return ActorReminderData(
Expand Down
15 changes: 11 additions & 4 deletions dapr/actor/runtime/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dapr.actor.runtime._reminder_data import ActorReminderData
from dapr.actor.runtime._timer_data import TIMER_CALLBACK, ActorTimerData
from dapr.actor.runtime.context import ActorRuntimeContext
from dapr.actor.runtime.failure_policy import ActorReminderFailurePolicy
from dapr.actor.runtime.state_manager import ActorStateManager


Expand Down Expand Up @@ -113,6 +114,7 @@ async def register_reminder(
due_time: timedelta,
period: Optional[timedelta] = None,
ttl: Optional[timedelta] = None,
failure_policy: Optional[ActorReminderFailurePolicy] = None,
) -> None:
"""Registers actor reminder.

Expand All @@ -129,11 +131,16 @@ async def register_reminder(
state (bytes): the user state passed to the reminder invocation.
due_time (datetime.timedelta): the amount of time to delay before invoking the reminder
for the first time.
period (datetime.timedelta): the time interval between reminder invocations after
the first invocation.
ttl (datetime.timedelta): the time interval before the reminder stops firing
period (Optional[datetime.timedelta]): the optional time interval between reminder
invocations after the first invocation. If not set, the Dapr runtime behavior
for one-off or non-periodic reminders applies.
ttl (Optional[datetime.timedelta]): the optional time interval before the reminder
stops firing. If not set, the Dapr runtime default behavior applies.
failure_policy (Optional[ActorReminderFailurePolicy]): the optional policy for
handling reminder failures. If not set, the Dapr runtime default applies
(3 retries per tick).
"""
reminder = ActorReminderData(name, state, due_time, period, ttl)
reminder = ActorReminderData(name, state, due_time, period, ttl, failure_policy)
req_body = self._runtime_ctx.message_serializer.serialize(reminder.as_dict())
await self._runtime_ctx.dapr_client.register_reminder(
self._runtime_ctx.actor_type_info.type_name, self.id.id, name, req_body
Expand Down
105 changes: 105 additions & 0 deletions dapr/actor/runtime/failure_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-

"""
Copyright 2026 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from datetime import timedelta
from typing import Any, Dict, Optional


class ActorReminderFailurePolicy:
"""Defines what happens when an actor reminder fails to trigger.

Use :meth:`drop_policy` to discard failed ticks without retrying, or
:meth:`constant_policy` to retry at a fixed interval.

Attributes:
drop: whether this is a drop (no-retry) policy.
interval: the retry interval for a constant policy.
max_retries: the maximum number of retries for a constant policy.
"""

def __init__(
self,
*,
drop: bool = False,
interval: Optional[timedelta] = None,
max_retries: Optional[int] = None,
):
"""Creates a new :class:`ActorReminderFailurePolicy` instance.

Args:
drop (bool): if True, creates a drop policy that discards the reminder
tick on failure without retrying. Cannot be combined with interval
or max_retries.
interval (datetime.timedelta): the retry interval for a constant policy.
max_retries (int): the maximum number of retries for a constant policy.
If not set, retries indefinitely.

Raises:
ValueError: if drop is combined with interval or max_retries, or if
neither drop=True nor at least one of interval/max_retries is provided.
"""
if drop and (interval is not None or max_retries is not None):
raise ValueError('drop policy cannot be combined with interval or max_retries')
if not drop and interval is None and max_retries is None:
raise ValueError('specify either drop=True or at least one of interval or max_retries')
self._drop = drop
self._interval = interval
self._max_retries = max_retries

@classmethod
def drop_policy(cls) -> 'ActorReminderFailurePolicy':
"""Returns a policy that drops the reminder tick on failure (no retry)."""
return cls(drop=True)

@classmethod
def constant_policy(
cls,
interval: Optional[timedelta] = None,
max_retries: Optional[int] = None,
) -> 'ActorReminderFailurePolicy':
"""Returns a policy that retries at a constant interval on failure.

Args:
interval (datetime.timedelta): the time between retry attempts.
max_retries (int): the maximum number of retry attempts. If not set,
retries indefinitely.
"""
return cls(interval=interval, max_retries=max_retries)

@property
def drop(self) -> bool:
"""Returns True if this is a drop policy."""
return self._drop

@property
def interval(self) -> Optional[timedelta]:
"""Returns the retry interval for a constant policy."""
return self._interval

@property
def max_retries(self) -> Optional[int]:
"""Returns the maximum retries for a constant policy."""
return self._max_retries

def as_dict(self) -> Dict[str, Any]:
"""Gets :class:`ActorReminderFailurePolicy` as a dict object."""
if self._drop:
return {'drop': {}}
d: Dict[str, Any] = {}
if self._interval is not None:
d['interval'] = self._interval
if self._max_retries is not None:
d['maxRetries'] = self._max_retries
return {'constant': d}
15 changes: 11 additions & 4 deletions dapr/actor/runtime/mock_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from dapr.actor.runtime._reminder_data import ActorReminderData
from dapr.actor.runtime._timer_data import TIMER_CALLBACK, ActorTimerData
from dapr.actor.runtime.actor import Actor
from dapr.actor.runtime.failure_policy import ActorReminderFailurePolicy
from dapr.actor.runtime.mock_state_manager import MockStateManager


Expand Down Expand Up @@ -88,6 +89,7 @@ async def register_reminder(
due_time: timedelta,
period: Optional[timedelta] = None,
ttl: Optional[timedelta] = None,
failure_policy: Optional[ActorReminderFailurePolicy] = None,
) -> None:
"""Adds actor reminder to self._state_manager._mock_reminders.

Expand All @@ -96,11 +98,16 @@ async def register_reminder(
state (bytes): the user state passed to the reminder invocation.
due_time (datetime.timedelta): the amount of time to delay before invoking the reminder
for the first time.
period (datetime.timedelta): the time interval between reminder invocations after
the first invocation.
ttl (datetime.timedelta): the time interval before the reminder stops firing
period (Optional[datetime.timedelta]): the optional time interval between reminder
invocations after the first invocation. If None, the reminder uses the Dapr
runtime behavior for one-off or non-periodic reminders.
ttl (Optional[datetime.timedelta]): the optional time interval before the reminder
stops firing. If None, no explicit TTL is set.
failure_policy (Optional[ActorReminderFailurePolicy]): the optional policy for
handling reminder failures. If not set, the Dapr runtime default applies
(3 retries per tick).
"""
reminder = ActorReminderData(name, state, due_time, period, ttl)
reminder = ActorReminderData(name, state, due_time, period, ttl, failure_policy)
self._state_manager._mock_reminders[name] = reminder # type: ignore

async def unregister_reminder(self, name: str) -> None:
Expand Down
60 changes: 60 additions & 0 deletions tests/actor/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dapr.actor.runtime._type_information import ActorTypeInformation
from dapr.actor.runtime.config import ActorRuntimeConfig
from dapr.actor.runtime.context import ActorRuntimeContext
from dapr.actor.runtime.failure_policy import ActorReminderFailurePolicy
from dapr.actor.runtime.runtime import ActorRuntime
from dapr.conf import settings
from dapr.serializers import DefaultJSONSerializer
Expand Down Expand Up @@ -151,6 +152,65 @@ def test_register_reminder(self):
'FakeSimpleReminderActor', 'test_id', 'test_reminder'
)

@mock.patch(
'tests.actor.fake_client.FakeDaprActorClient.register_reminder',
new=_async_mock(return_value=b'"ok"'),
)
def test_register_reminder_with_failure_policy(self):
test_actor_id = ActorId('test_id')
test_type_info = ActorTypeInformation.create(FakeSimpleReminderActor)
test_client = FakeDaprActorClient
ctx = ActorRuntimeContext(test_type_info, self._serializer, self._serializer, test_client)
test_actor = FakeSimpleReminderActor(ctx, test_actor_id)

_run(
test_actor.register_reminder(
'test_reminder',
b'reminder_message',
timedelta(seconds=1),
timedelta(seconds=1),
failure_policy=ActorReminderFailurePolicy.drop_policy(),
)
)
test_client.register_reminder.mock.assert_called_once()
test_client.register_reminder.mock.assert_called_with(
'FakeSimpleReminderActor',
'test_id',
'test_reminder',
b'{"reminderName":"test_reminder","dueTime":"0h0m1s0ms0\\u03bcs","period":"0h0m1s0ms0\\u03bcs","data":"cmVtaW5kZXJfbWVzc2FnZQ==","failurePolicy":{"drop":{}}}', # noqa E501
)

@mock.patch(
'tests.actor.fake_client.FakeDaprActorClient.register_reminder',
new=_async_mock(return_value=b'"ok"'),
)
def test_register_reminder_with_constant_failure_policy(self):
test_actor_id = ActorId('test_id')
test_type_info = ActorTypeInformation.create(FakeSimpleReminderActor)
test_client = FakeDaprActorClient
ctx = ActorRuntimeContext(test_type_info, self._serializer, self._serializer, test_client)
test_actor = FakeSimpleReminderActor(ctx, test_actor_id)

_run(
test_actor.register_reminder(
'test_reminder',
b'reminder_message',
timedelta(seconds=1),
timedelta(seconds=1),
failure_policy=ActorReminderFailurePolicy.constant_policy(
interval=timedelta(seconds=2),
max_retries=5,
),
)
)
test_client.register_reminder.mock.assert_called_once()
test_client.register_reminder.mock.assert_called_with(
'FakeSimpleReminderActor',
'test_id',
'test_reminder',
b'{"reminderName":"test_reminder","dueTime":"0h0m1s0ms0\\u03bcs","period":"0h0m1s0ms0\\u03bcs","data":"cmVtaW5kZXJfbWVzc2FnZQ==","failurePolicy":{"constant":{"interval":"0h0m2s0ms0\\u03bcs","maxRetries":5}}}', # noqa E501
)

@mock.patch(
'tests.actor.fake_client.FakeDaprActorClient.register_timer',
new=_async_mock(return_value=b'"ok"'),
Expand Down
Loading