Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ ADDED
- Added `AsyncTaskHubGrpcClient` for asyncio-based applications using `grpc.aio`
- Added `DefaultAsyncClientInterceptorImpl` for async gRPC metadata interceptors
- Added `get_async_grpc_channel` helper for creating async gRPC channels
- Improved distributed tracing support with full span coverage for orchestrations, activities, sub-orchestrations, timers, and events

CHANGED

Expand Down
73 changes: 50 additions & 23 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Licensed under the MIT License.

import logging
import uuid
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
Expand All @@ -16,6 +17,7 @@
import durabletask.internal.orchestrator_service_pb2 as pb
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
import durabletask.internal.shared as shared
import durabletask.internal.tracing as tracing
from durabletask import task
from durabletask.internal.client_helpers import (
build_query_entities_req,
Expand Down Expand Up @@ -176,14 +178,28 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
tags: Optional[dict[str, str]] = None,
version: Optional[str] = None) -> str:

req = build_schedule_new_orchestration_req(
orchestrator, input=input, instance_id=instance_id, start_at=start_at,
reuse_id_policy=reuse_id_policy, tags=tags,
version=version if version else self.default_version)

self._logger.info(f"Starting new '{req.name}' instance with ID = '{req.instanceId}'.")
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
return res.instanceId
name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)
resolved_instance_id = instance_id if instance_id else uuid.uuid4().hex
resolved_version = version if version else self.default_version

with tracing.start_create_orchestration_span(
name, resolved_instance_id, version=resolved_version,
):
req = build_schedule_new_orchestration_req(
orchestrator, input=input, instance_id=instance_id, start_at=start_at,
reuse_id_policy=reuse_id_policy, tags=tags,
version=version if version else self.default_version)

# Inject the active PRODUCER span context into the request so the sidecar
# stores it in the executionStarted event and the worker can parent all
# orchestration/activity/timer spans under this trace.
parent_trace_ctx = tracing.get_current_trace_context()
if parent_trace_ctx is not None:
req.parentTraceContext.CopyFrom(parent_trace_ctx)

self._logger.info(f"Starting new '{req.name}' instance with ID = '{req.instanceId}'.")
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
return res.instanceId

def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = True) -> Optional[OrchestrationState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
Expand Down Expand Up @@ -245,10 +261,10 @@ def wait_for_orchestration_completion(self, instance_id: str, *,

def raise_orchestration_event(self, instance_id: str, event_name: str, *,
data: Optional[Any] = None) -> None:
req = build_raise_event_req(instance_id, event_name, data)

self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
self._stub.RaiseEvent(req)
with tracing.start_raise_event_span(event_name, instance_id):
req = build_raise_event_req(instance_id, event_name, data)
self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
self._stub.RaiseEvent(req)

def terminate_orchestration(self, instance_id: str, *,
output: Optional[Any] = None,
Expand Down Expand Up @@ -418,14 +434,25 @@ async def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator
tags: Optional[dict[str, str]] = None,
version: Optional[str] = None) -> str:

req = build_schedule_new_orchestration_req(
orchestrator, input=input, instance_id=instance_id, start_at=start_at,
reuse_id_policy=reuse_id_policy, tags=tags,
version=version if version else self.default_version)
name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)
resolved_instance_id = instance_id if instance_id else uuid.uuid4().hex
resolved_version = version if version else self.default_version

self._logger.info(f"Starting new '{req.name}' instance with ID = '{req.instanceId}'.")
res: pb.CreateInstanceResponse = await self._stub.StartInstance(req)
return res.instanceId
with tracing.start_create_orchestration_span(
name, resolved_instance_id, version=resolved_version,
):
req = build_schedule_new_orchestration_req(
orchestrator, input=input, instance_id=instance_id, start_at=start_at,
reuse_id_policy=reuse_id_policy, tags=tags,
version=version if version else self.default_version)

parent_trace_ctx = tracing.get_current_trace_context()
if parent_trace_ctx is not None:
req.parentTraceContext.CopyFrom(parent_trace_ctx)

self._logger.info(f"Starting new '{req.name}' instance with ID = '{req.instanceId}'.")
res: pb.CreateInstanceResponse = await self._stub.StartInstance(req)
return res.instanceId

async def get_orchestration_state(self, instance_id: str, *,
fetch_payloads: bool = True) -> Optional[OrchestrationState]:
Expand Down Expand Up @@ -487,10 +514,10 @@ async def wait_for_orchestration_completion(self, instance_id: str, *,

async def raise_orchestration_event(self, instance_id: str, event_name: str, *,
data: Optional[Any] = None) -> None:
req = build_raise_event_req(instance_id, event_name, data)

self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
await self._stub.RaiseEvent(req)
with tracing.start_raise_event_span(event_name, instance_id):
req = build_raise_event_req(instance_id, event_name, data)
self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
await self._stub.RaiseEvent(req)

async def terminate_orchestration(self, instance_id: str, *,
output: Optional[Any] = None,
Expand Down
18 changes: 12 additions & 6 deletions durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ def new_orchestrator_completed_event() -> pb.HistoryEvent:

def new_execution_started_event(name: str, instance_id: str, encoded_input: Optional[str] = None,
tags: Optional[dict[str, str]] = None,
version: Optional[str] = None) -> pb.HistoryEvent:
version: Optional[str] = None,
parent_trace_context: Optional[pb.TraceContext] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
Expand All @@ -36,7 +37,8 @@ def new_execution_started_event(name: str, instance_id: str, encoded_input: Opti
version=get_string_value(version),
input=get_string_value(encoded_input),
orchestrationInstance=pb.OrchestrationInstance(instanceId=instance_id),
tags=tags))
tags=tags,
parentTraceContext=parent_trace_context))


def new_timer_created_event(timer_id: int, fire_at: datetime) -> pb.HistoryEvent:
Expand Down Expand Up @@ -223,11 +225,13 @@ def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction


def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str],
tags: Optional[dict[str, str]]) -> pb.OrchestratorAction:
tags: Optional[dict[str, str]],
parent_trace_context: Optional[pb.TraceContext] = None) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, scheduleTask=pb.ScheduleTaskAction(
name=name,
input=get_string_value(encoded_input),
tags=tags
tags=tags,
parentTraceContext=parent_trace_context,
))


Expand Down Expand Up @@ -302,12 +306,14 @@ def new_create_sub_orchestration_action(
name: str,
instance_id: Optional[str],
encoded_input: Optional[str],
version: Optional[str]) -> pb.OrchestratorAction:
version: Optional[str],
parent_trace_context: Optional[pb.TraceContext] = None) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, createSubOrchestration=pb.CreateSubOrchestrationAction(
name=name,
instanceId=instance_id,
input=get_string_value(encoded_input),
version=get_string_value(version)
version=get_string_value(version),
parentTraceContext=parent_trace_context,
))


Expand Down
Loading
Loading