diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py index cd5e632f..339b6b77 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py @@ -26,6 +26,7 @@ from durabletask.aio import client as aioclient from grpc.aio import AioRpcError +from dapr.aio.clients.grpc.interceptors import DaprClientTimeoutInterceptorAsync from dapr.clients import DaprInternalError from dapr.clients.http.client import DAPR_API_TOKEN_HEADER from dapr.conf import settings @@ -68,6 +69,7 @@ def __init__( secure_channel=uri.tls, log_handler=options.log_handler, log_formatter=options.log_formatter, + interceptors=[DaprClientTimeoutInterceptorAsync()], ) async def schedule_new_workflow( diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py index 36a731c4..1aa85472 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py @@ -27,6 +27,7 @@ from grpc import RpcError from dapr.clients import DaprInternalError +from dapr.clients.grpc.interceptors import DaprClientTimeoutInterceptor from dapr.clients.http.client import DAPR_API_TOKEN_HEADER from dapr.conf import settings from dapr.conf.helpers import GrpcEndpoint @@ -71,6 +72,7 @@ def __init__( secure_channel=uri.tls, log_handler=options.log_handler, log_formatter=options.log_formatter, + interceptors=[DaprClientTimeoutInterceptor()], ) def schedule_new_workflow( diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index e2bf50d4..4c47c566 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -27,6 +27,7 @@ from durabletask import task, worker from dapr.clients import DaprInternalError +from dapr.clients.grpc.interceptors import DaprClientTimeoutInterceptor from dapr.clients.http.client import DAPR_API_TOKEN_HEADER from dapr.conf import settings from dapr.conf.helpers import GrpcEndpoint @@ -71,13 +72,17 @@ def __init__( raise DaprInternalError(f'{error}') from error options = self._logger.get_options() + all_interceptors = [] + if interceptors: + all_interceptors.extend(interceptors) + all_interceptors.append(DaprClientTimeoutInterceptor()) self.__worker = worker.TaskHubGrpcWorker( host_address=uri.endpoint, metadata=metadata, secure_channel=uri.tls, log_handler=options.log_handler, log_formatter=options.log_formatter, - interceptors=interceptors, + interceptors=all_interceptors, concurrency_options=worker.ConcurrencyOptions( maximum_concurrent_activity_work_items=maximum_concurrent_activity_work_items, maximum_concurrent_orchestration_work_items=maximum_concurrent_orchestration_work_items,