Skip to content
4 changes: 4 additions & 0 deletions src/Grpc/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ message EntityOperationSignaledEvent {
google.protobuf.Timestamp scheduledTime = 3;
google.protobuf.StringValue input = 4;
google.protobuf.StringValue targetInstanceId = 5; // used only within histories, null in messages
TraceContext parentTraceContext = 6;
}

message EntityOperationCalledEvent {
Expand All @@ -192,6 +193,7 @@ message EntityOperationCalledEvent {
google.protobuf.StringValue parentInstanceId = 5; // used only within messages, null in histories
google.protobuf.StringValue parentExecutionId = 6; // used only within messages, null in histories
google.protobuf.StringValue targetInstanceId = 7; // used only within histories, null in messages
TraceContext parentTraceContext = 8;
}

message EntityLockRequestedEvent {
Expand Down Expand Up @@ -318,6 +320,8 @@ message SendEntityMessageAction {
EntityLockRequestedEvent entityLockRequested = 3;
EntityUnlockSentEvent entityUnlockSent = 4;
}

TraceContext parentTraceContext = 5;
Copy link
Member Author

@cgillum cgillum Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: don't merge changes to this file until properly vendored (requires microsoft/durabletask-protobuf#64 to be merged).

}

message OrchestratorAction {
Expand Down
7 changes: 7 additions & 0 deletions src/Shared/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ internal static P.OrchestratorResponse ConstructOrchestratorResponse(
out string requestId);

entityConversionState.EntityRequestIds.Add(requestId);
sendAction.ParentTraceContext = CreateTraceContext();

switch (sendAction.EntityMessageTypeCase)
{
Expand Down Expand Up @@ -636,6 +637,9 @@ internal static void ToEntityBatchRequest(
Id = Guid.Parse(op.EntityOperationSignaled.RequestId),
Operation = op.EntityOperationSignaled.Operation,
Input = op.EntityOperationSignaled.Input,
TraceContext = op.EntityOperationSignaled.ParentTraceContext is { } signalTc
? new DistributedTraceContext(signalTc.TraceParent, signalTc.TraceState)
: null,
});
operationInfos.Add(new P.OperationInfo
{
Expand All @@ -650,6 +654,9 @@ internal static void ToEntityBatchRequest(
Id = Guid.Parse(op.EntityOperationCalled.RequestId),
Operation = op.EntityOperationCalled.Operation,
Input = op.EntityOperationCalled.Input,
TraceContext = op.EntityOperationCalled.ParentTraceContext is { } calledTc
? new DistributedTraceContext(calledTc.TraceParent, calledTc.TraceState)
: null,
});
operationInfos.Add(new P.OperationInfo
{
Expand Down
5 changes: 5 additions & 0 deletions src/Shared/Grpc/Tracing/Schema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,10 @@ public static class Task
/// The time at which the timer is scheduled to fire.
/// </summary>
public const string FireAt = "durabletask.fire_at";

/// <summary>
/// The name of the entity operation being executed.
/// </summary>
public const string EntityOperationName = "durabletask.entity.operation";
}
}
5 changes: 5 additions & 0 deletions src/Shared/Grpc/Tracing/TraceActivityConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ static class TraceActivityConstants
/// </summary>
public const string Event = "event";

/// <summary>
/// The name of the activity that represents entity operation execution.
/// </summary>
public const string EntityOperation = "entity_operation";

/// <summary>
/// The name of the activity that represents timer operations.
/// </summary>
Expand Down
165 changes: 165 additions & 0 deletions src/Shared/Grpc/Tracing/TraceHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,58 @@ static class TraceHelper
return newActivity;
}

/// <summary>
/// Starts a new trace activity for executing an entity operation.
/// </summary>
/// <param name="entityName">The name of the entity.</param>
/// <param name="operationName">The name of the operation being executed.</param>
/// <param name="instanceId">The instance ID of the entity.</param>
/// <param name="traceParent">The W3C traceparent header value from the parent orchestration.</param>
/// <param name="traceState">The W3C tracestate header value from the parent orchestration.</param>
/// <returns>
/// Returns a newly started <see cref="Activity"/> with entity operation metadata, or null if tracing is not enabled.
/// </returns>
public static Activity? StartTraceActivityForEntityOperation(
string entityName,
string? operationName,
string instanceId,
string? traceParent,
string? traceState)
{
if (traceParent is null || !ActivityContext.TryParse(
traceParent,
traceState,
out ActivityContext activityContext))
{
return null;
}

string spanName = string.IsNullOrEmpty(operationName)
? $"{TraceActivityConstants.EntityOperation}:{entityName}"
: $"{TraceActivityConstants.EntityOperation}:{entityName}:{operationName}";

Activity? newActivity = ActivityTraceSource.StartActivity(
spanName,
kind: ActivityKind.Server,
parentContext: activityContext);

if (newActivity == null)
{
return null;
}

newActivity.SetTag(Schema.Task.Type, TraceActivityConstants.EntityOperation);
newActivity.SetTag(Schema.Task.Name, entityName);
newActivity.SetTag(Schema.Task.InstanceId, instanceId);

if (!string.IsNullOrEmpty(operationName))
{
newActivity.SetTag(Schema.Task.EntityOperationName, operationName);
}

return newActivity;
}

/// <summary>
/// Emits a new trace activity for a (task) activity that successfully completes.
/// </summary>
Expand Down Expand Up @@ -202,6 +254,51 @@ public static void EmitTraceActivityForTaskFailed(
activity?.Dispose();
}

/// <summary>
/// Emits a new trace activity for an entity operation that successfully completes.
/// </summary>
/// <param name="instanceId">The ID of the associated orchestration.</param>
/// <param name="historyEvent">The associated <see cref="P.HistoryEvent" />.</param>
/// <param name="calledEvent">The associated <see cref="P.EntityOperationCalledEvent"/>.</param>
public static void EmitTraceActivityForEntityOperationCompleted(
string? instanceId,
P.HistoryEvent? historyEvent,
P.EntityOperationCalledEvent? calledEvent)
{
Activity? activity = StartTraceActivityForSchedulingEntityOperation(instanceId, historyEvent, calledEvent);

activity?.Dispose();
}

/// <summary>
/// Emits a new trace activity for an entity operation that fails.
/// </summary>
/// <param name="instanceId">The ID of the associated orchestration.</param>
/// <param name="historyEvent">The associated <see cref="P.HistoryEvent" />.</param>
/// <param name="calledEvent">The associated <see cref="P.EntityOperationCalledEvent"/>.</param>
/// <param name="failedEvent">The associated <see cref="P.EntityOperationFailedEvent"/>.</param>
public static void EmitTraceActivityForEntityOperationFailed(
string? instanceId,
P.HistoryEvent? historyEvent,
P.EntityOperationCalledEvent? calledEvent,
P.EntityOperationFailedEvent? failedEvent)
{
Activity? activity = StartTraceActivityForSchedulingEntityOperation(instanceId, historyEvent, calledEvent);

if (activity is null)
{
return;
}

if (failedEvent != null)
{
string statusDescription = failedEvent.FailureDetails?.ErrorMessage ?? "Unspecified entity operation failure";
activity.SetStatus(ActivityStatusCode.Error, statusDescription);
}

activity.Dispose();
}

/// <summary>
/// Emits a new trace activity for sub-orchestration execution when the sub-orchestration
/// completes successfully.
Expand Down Expand Up @@ -409,6 +506,74 @@ static string CreateSpanName(string spanDescription, string? taskName, string? t
return newActivity;
}

/// <summary>
/// Starts a new trace activity for scheduling an entity operation. Represents the time between
/// enqueuing the entity operation message and it completing.
/// </summary>
/// <param name="instanceId">The ID of the associated orchestration.</param>
/// <param name="historyEvent">The associated <see cref="P.HistoryEvent" />.</param>
/// <param name="calledEvent">The associated <see cref="P.EntityOperationCalledEvent"/>.</param>
/// <returns>
/// Returns a newly started <see cref="Activity"/> with entity operation metadata.
/// </returns>
static Activity? StartTraceActivityForSchedulingEntityOperation(
string? instanceId,
P.HistoryEvent? historyEvent,
P.EntityOperationCalledEvent? calledEvent)
{
if (calledEvent == null)
{
return null;
}

string targetInstanceId = historyEvent?.EntityOperationCalled?.TargetInstanceId ?? string.Empty;

// Extract entity name from instance ID (format: "@name@key")
string entityName = targetInstanceId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we have APIs to do this in EntityInstanceId if I'm not mistaken?

if (targetInstanceId.Length > 1 && targetInstanceId[0] == '@')
{
int secondAt = targetInstanceId.IndexOf('@', 1);
if (secondAt > 1)
{
entityName = targetInstanceId.Substring(1, secondAt - 1);
}
}
string spanName = string.IsNullOrEmpty(calledEvent.Operation)
? $"{TraceActivityConstants.EntityOperation}:{entityName}"
: $"{TraceActivityConstants.EntityOperation}:{entityName}:{calledEvent.Operation}";

Activity? newActivity = ActivityTraceSource.StartActivity(
spanName,
kind: ActivityKind.Client,
startTime: historyEvent?.Timestamp?.ToDateTimeOffset() ?? default,
parentContext: Activity.Current?.Context ?? default);

if (newActivity == null)
{
return null;
}

if (calledEvent.ParentTraceContext != null
&& ActivityContext.TryParse(
calledEvent.ParentTraceContext.TraceParent,
calledEvent.ParentTraceContext?.TraceState,
out ActivityContext parentContext))
{
newActivity.SetSpanId(parentContext.SpanId.ToString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you happen to know why we use Activity.Current.Context as the parent trace context but manually set the span ID here? Why aren't we setting the parent trace context to just be the parentContext extracted here?

I see this is done for the other methods here as well but I forget (or perhaps never understood) why

}

newActivity.AddTag(Schema.Task.Type, TraceActivityConstants.EntityOperation);
newActivity.AddTag(Schema.Task.Name, entityName);
newActivity.AddTag(Schema.Task.InstanceId, instanceId);

if (!string.IsNullOrEmpty(calledEvent.Operation))
{
newActivity.AddTag(Schema.Task.EntityOperationName, calledEvent.Operation);
}

return newActivity;
}

/// <summary>
/// Starts a new trace activity for sub-orchestrations. Represents the time between enqueuing
/// the sub-orchestration message and it completing.
Expand Down
60 changes: 59 additions & 1 deletion src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,14 @@ async Task OnRunOrchestratorAsync(
return taskScheduledEvent;
}

P.HistoryEvent? GetEntityOperationCalledEvent(string requestId)
{
return request
.PastEvents
.Where(x => x.EventTypeCase == P.HistoryEvent.EventTypeOneofCase.EntityOperationCalled)
.FirstOrDefault(x => x.EntityOperationCalled.RequestId == requestId);
}

foreach (var newEvent in request.NewEvents)
{
switch (newEvent.EventTypeCase)
Expand Down Expand Up @@ -565,6 +573,33 @@ async Task OnRunOrchestratorAsync(
newEvent.Timestamp.ToDateTime(),
newEvent.TimerFired);
break;

case P.HistoryEvent.EventTypeOneofCase.EntityOperationCompleted:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the logic for emitting client spans for an entity signal request? Maybe I'm missing something but looks like this is just for entity calls

{
P.HistoryEvent? entityCalledEvent =
GetEntityOperationCalledEvent(
newEvent.EntityOperationCompleted.RequestId);

TraceHelper.EmitTraceActivityForEntityOperationCompleted(
request.InstanceId,
entityCalledEvent,
entityCalledEvent?.EntityOperationCalled);
break;
}

case P.HistoryEvent.EventTypeOneofCase.EntityOperationFailed:
{
P.HistoryEvent? entityCalledEvent =
GetEntityOperationCalledEvent(
newEvent.EntityOperationFailed.RequestId);

TraceHelper.EmitTraceActivityForEntityOperationFailed(
request.InstanceId,
entityCalledEvent,
entityCalledEvent?.EntityOperationCalled,
newEvent.EntityOperationFailed);
break;
}
}
}
}
Expand Down Expand Up @@ -865,6 +900,20 @@ async Task OnRunEntityBatchAsync(
var coreEntityId = DTCore.Entities.EntityId.FromString(batchRequest.InstanceId!);
EntityId entityId = new(coreEntityId.Name, coreEntityId.Key);

// Start a Server trace span for each entity operation in the batch.
// Each operation may come from a different orchestration with its own trace context,
// so we create individual spans to preserve correct parent-child relationships.
List<Activity> traceActivities = batchRequest.Operations?
.Select(op => TraceHelper.StartTraceActivityForEntityOperation(
entityId.Name,
op.Operation,
batchRequest.InstanceId!,
op.TraceContext?.TraceParent,
op.TraceContext?.TraceState))
.OfType<Activity>()
.ToList()
?? [];

TaskName name = new(entityId.Name);

EntityBatchResult? batchResult;
Expand All @@ -885,6 +934,9 @@ async Task OnRunEntityBatchAsync(
{
// we could not find the entity. This is considered an application error,
// so we return a non-retriable error-OperationResult for each operation in the batch.
string errorMessage = $"No entity task named '{name}' was found.";
traceActivities.ForEach(a => a.SetStatus(ActivityStatusCode.Error, errorMessage));

batchResult = new EntityBatchResult()
{
Actions = [], // no actions
Expand All @@ -894,7 +946,7 @@ async Task OnRunEntityBatchAsync(
{
FailureDetails = new FailureDetails(
errorType: "EntityTaskNotFound",
errorMessage: $"No entity task named '{name}' was found.",
errorMessage: errorMessage,
stackTrace: null,
innerFailure: null,
isNonRetriable: true),
Expand All @@ -913,6 +965,12 @@ async Task OnRunEntityBatchAsync(
{
FailureDetails = new FailureDetails(frameworkException),
};

traceActivities.ForEach(a => a.SetStatus(ActivityStatusCode.Error, frameworkException.Message));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again just a question for my own understanding - in this case and the case above, we will just end up with Activities with just server spans and no corresponding client span?

}
finally
{
traceActivities.ForEach(a => a.Dispose());
}

P.EntityBatchResult response = batchResult.ToEntityBatchResult(
Expand Down
Loading
Loading