-
Notifications
You must be signed in to change notification settings - Fork 53
Propagate trace context for entity operations #654
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ab59072
d2057e0
2df4cdd
6fade3c
ee1b67a
b6d5ae6
1d199d5
764c94a
f78f80b
0e4deef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
sophiatev marked this conversation as resolved.
Show resolved
Hide resolved
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -156,6 +156,58 @@ static class TraceHelper | |
| return newActivity; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Starts a new trace activity for executing an entity operation. | ||
| /// </summary> | ||
| /// <param name="entityName">The name of the entity.</param> | ||
| /// <param name="operationName">The name of the operation being executed.</param> | ||
| /// <param name="instanceId">The instance ID of the entity.</param> | ||
| /// <param name="traceParent">The W3C traceparent header value from the parent orchestration.</param> | ||
| /// <param name="traceState">The W3C tracestate header value from the parent orchestration.</param> | ||
| /// <returns> | ||
| /// Returns a newly started <see cref="Activity"/> with entity operation metadata, or null if tracing is not enabled. | ||
| /// </returns> | ||
| public static Activity? StartTraceActivityForEntityOperation( | ||
| string entityName, | ||
| string? operationName, | ||
| string instanceId, | ||
| string? traceParent, | ||
| string? traceState) | ||
| { | ||
| if (traceParent is null || !ActivityContext.TryParse( | ||
| traceParent, | ||
| traceState, | ||
| out ActivityContext activityContext)) | ||
| { | ||
| return null; | ||
| } | ||
|
|
||
| string spanName = string.IsNullOrEmpty(operationName) | ||
| ? $"{TraceActivityConstants.EntityOperation}:{entityName}" | ||
| : $"{TraceActivityConstants.EntityOperation}:{entityName}:{operationName}"; | ||
|
|
||
| Activity? newActivity = ActivityTraceSource.StartActivity( | ||
| spanName, | ||
| kind: ActivityKind.Server, | ||
| parentContext: activityContext); | ||
|
|
||
| if (newActivity == null) | ||
| { | ||
| return null; | ||
| } | ||
|
|
||
| newActivity.SetTag(Schema.Task.Type, TraceActivityConstants.EntityOperation); | ||
| newActivity.SetTag(Schema.Task.Name, entityName); | ||
| newActivity.SetTag(Schema.Task.InstanceId, instanceId); | ||
|
|
||
| if (!string.IsNullOrEmpty(operationName)) | ||
| { | ||
| newActivity.SetTag(Schema.Task.EntityOperationName, operationName); | ||
| } | ||
|
|
||
| return newActivity; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Emits a new trace activity for a (task) activity that successfully completes. | ||
| /// </summary> | ||
|
|
@@ -202,6 +254,51 @@ public static void EmitTraceActivityForTaskFailed( | |
| activity?.Dispose(); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Emits a new trace activity for an entity operation that successfully completes. | ||
| /// </summary> | ||
| /// <param name="instanceId">The ID of the associated orchestration.</param> | ||
| /// <param name="historyEvent">The associated <see cref="P.HistoryEvent" />.</param> | ||
| /// <param name="calledEvent">The associated <see cref="P.EntityOperationCalledEvent"/>.</param> | ||
| public static void EmitTraceActivityForEntityOperationCompleted( | ||
| string? instanceId, | ||
| P.HistoryEvent? historyEvent, | ||
| P.EntityOperationCalledEvent? calledEvent) | ||
| { | ||
| Activity? activity = StartTraceActivityForSchedulingEntityOperation(instanceId, historyEvent, calledEvent); | ||
|
|
||
| activity?.Dispose(); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Emits a new trace activity for an entity operation that fails. | ||
| /// </summary> | ||
| /// <param name="instanceId">The ID of the associated orchestration.</param> | ||
| /// <param name="historyEvent">The associated <see cref="P.HistoryEvent" />.</param> | ||
| /// <param name="calledEvent">The associated <see cref="P.EntityOperationCalledEvent"/>.</param> | ||
| /// <param name="failedEvent">The associated <see cref="P.EntityOperationFailedEvent"/>.</param> | ||
| public static void EmitTraceActivityForEntityOperationFailed( | ||
| string? instanceId, | ||
| P.HistoryEvent? historyEvent, | ||
| P.EntityOperationCalledEvent? calledEvent, | ||
| P.EntityOperationFailedEvent? failedEvent) | ||
| { | ||
| Activity? activity = StartTraceActivityForSchedulingEntityOperation(instanceId, historyEvent, calledEvent); | ||
|
|
||
| if (activity is null) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| if (failedEvent != null) | ||
| { | ||
| string statusDescription = failedEvent.FailureDetails?.ErrorMessage ?? "Unspecified entity operation failure"; | ||
| activity.SetStatus(ActivityStatusCode.Error, statusDescription); | ||
| } | ||
|
|
||
| activity.Dispose(); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Emits a new trace activity for sub-orchestration execution when the sub-orchestration | ||
| /// completes successfully. | ||
|
|
@@ -409,6 +506,74 @@ static string CreateSpanName(string spanDescription, string? taskName, string? t | |
| return newActivity; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Starts a new trace activity for scheduling an entity operation. Represents the time between | ||
| /// enqueuing the entity operation message and it completing. | ||
| /// </summary> | ||
| /// <param name="instanceId">The ID of the associated orchestration.</param> | ||
| /// <param name="historyEvent">The associated <see cref="P.HistoryEvent" />.</param> | ||
| /// <param name="calledEvent">The associated <see cref="P.EntityOperationCalledEvent"/>.</param> | ||
| /// <returns> | ||
| /// Returns a newly started <see cref="Activity"/> with entity operation metadata. | ||
| /// </returns> | ||
| static Activity? StartTraceActivityForSchedulingEntityOperation( | ||
| string? instanceId, | ||
| P.HistoryEvent? historyEvent, | ||
| P.EntityOperationCalledEvent? calledEvent) | ||
| { | ||
| if (calledEvent == null) | ||
| { | ||
| return null; | ||
| } | ||
|
|
||
| string targetInstanceId = historyEvent?.EntityOperationCalled?.TargetInstanceId ?? string.Empty; | ||
|
|
||
| // Extract entity name from instance ID (format: "@name@key") | ||
| string entityName = targetInstanceId; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe we have APIs to do this in |
||
| if (targetInstanceId.Length > 1 && targetInstanceId[0] == '@') | ||
| { | ||
| int secondAt = targetInstanceId.IndexOf('@', 1); | ||
| if (secondAt > 1) | ||
| { | ||
| entityName = targetInstanceId.Substring(1, secondAt - 1); | ||
| } | ||
| } | ||
| string spanName = string.IsNullOrEmpty(calledEvent.Operation) | ||
| ? $"{TraceActivityConstants.EntityOperation}:{entityName}" | ||
| : $"{TraceActivityConstants.EntityOperation}:{entityName}:{calledEvent.Operation}"; | ||
|
|
||
| Activity? newActivity = ActivityTraceSource.StartActivity( | ||
| spanName, | ||
| kind: ActivityKind.Client, | ||
| startTime: historyEvent?.Timestamp?.ToDateTimeOffset() ?? default, | ||
| parentContext: Activity.Current?.Context ?? default); | ||
|
|
||
| if (newActivity == null) | ||
| { | ||
| return null; | ||
| } | ||
|
|
||
| if (calledEvent.ParentTraceContext != null | ||
| && ActivityContext.TryParse( | ||
| calledEvent.ParentTraceContext.TraceParent, | ||
| calledEvent.ParentTraceContext?.TraceState, | ||
| out ActivityContext parentContext)) | ||
| { | ||
| newActivity.SetSpanId(parentContext.SpanId.ToString()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you happen to know why we use I see this is done for the other methods here as well but I forget (or perhaps never understood) why |
||
| } | ||
|
|
||
| newActivity.AddTag(Schema.Task.Type, TraceActivityConstants.EntityOperation); | ||
| newActivity.AddTag(Schema.Task.Name, entityName); | ||
| newActivity.AddTag(Schema.Task.InstanceId, instanceId); | ||
|
|
||
| if (!string.IsNullOrEmpty(calledEvent.Operation)) | ||
| { | ||
| newActivity.AddTag(Schema.Task.EntityOperationName, calledEvent.Operation); | ||
| } | ||
|
|
||
| return newActivity; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Starts a new trace activity for sub-orchestrations. Represents the time between enqueuing | ||
| /// the sub-orchestration message and it completing. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -502,6 +502,14 @@ async Task OnRunOrchestratorAsync( | |
| return taskScheduledEvent; | ||
| } | ||
|
|
||
| P.HistoryEvent? GetEntityOperationCalledEvent(string requestId) | ||
| { | ||
| return request | ||
| .PastEvents | ||
| .Where(x => x.EventTypeCase == P.HistoryEvent.EventTypeOneofCase.EntityOperationCalled) | ||
| .FirstOrDefault(x => x.EntityOperationCalled.RequestId == requestId); | ||
| } | ||
|
|
||
| foreach (var newEvent in request.NewEvents) | ||
| { | ||
| switch (newEvent.EventTypeCase) | ||
|
|
@@ -565,6 +573,33 @@ async Task OnRunOrchestratorAsync( | |
| newEvent.Timestamp.ToDateTime(), | ||
| newEvent.TimerFired); | ||
| break; | ||
|
|
||
| case P.HistoryEvent.EventTypeOneofCase.EntityOperationCompleted: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is the logic for emitting client spans for an entity signal request? Maybe I'm missing something but looks like this is just for entity calls |
||
| { | ||
| P.HistoryEvent? entityCalledEvent = | ||
| GetEntityOperationCalledEvent( | ||
| newEvent.EntityOperationCompleted.RequestId); | ||
|
|
||
| TraceHelper.EmitTraceActivityForEntityOperationCompleted( | ||
| request.InstanceId, | ||
| entityCalledEvent, | ||
| entityCalledEvent?.EntityOperationCalled); | ||
| break; | ||
| } | ||
|
|
||
| case P.HistoryEvent.EventTypeOneofCase.EntityOperationFailed: | ||
| { | ||
| P.HistoryEvent? entityCalledEvent = | ||
| GetEntityOperationCalledEvent( | ||
| newEvent.EntityOperationFailed.RequestId); | ||
|
|
||
| TraceHelper.EmitTraceActivityForEntityOperationFailed( | ||
| request.InstanceId, | ||
| entityCalledEvent, | ||
| entityCalledEvent?.EntityOperationCalled, | ||
| newEvent.EntityOperationFailed); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -865,6 +900,20 @@ async Task OnRunEntityBatchAsync( | |
| var coreEntityId = DTCore.Entities.EntityId.FromString(batchRequest.InstanceId!); | ||
| EntityId entityId = new(coreEntityId.Name, coreEntityId.Key); | ||
|
|
||
| // Start a Server trace span for each entity operation in the batch. | ||
| // Each operation may come from a different orchestration with its own trace context, | ||
| // so we create individual spans to preserve correct parent-child relationships. | ||
| List<Activity> traceActivities = batchRequest.Operations? | ||
| .Select(op => TraceHelper.StartTraceActivityForEntityOperation( | ||
| entityId.Name, | ||
| op.Operation, | ||
| batchRequest.InstanceId!, | ||
| op.TraceContext?.TraceParent, | ||
| op.TraceContext?.TraceState)) | ||
| .OfType<Activity>() | ||
| .ToList() | ||
| ?? []; | ||
|
|
||
| TaskName name = new(entityId.Name); | ||
|
|
||
| EntityBatchResult? batchResult; | ||
|
|
@@ -885,6 +934,9 @@ async Task OnRunEntityBatchAsync( | |
| { | ||
| // we could not find the entity. This is considered an application error, | ||
| // so we return a non-retriable error-OperationResult for each operation in the batch. | ||
| string errorMessage = $"No entity task named '{name}' was found."; | ||
| traceActivities.ForEach(a => a.SetStatus(ActivityStatusCode.Error, errorMessage)); | ||
|
|
||
| batchResult = new EntityBatchResult() | ||
| { | ||
| Actions = [], // no actions | ||
|
|
@@ -894,7 +946,7 @@ async Task OnRunEntityBatchAsync( | |
| { | ||
| FailureDetails = new FailureDetails( | ||
| errorType: "EntityTaskNotFound", | ||
| errorMessage: $"No entity task named '{name}' was found.", | ||
| errorMessage: errorMessage, | ||
| stackTrace: null, | ||
| innerFailure: null, | ||
| isNonRetriable: true), | ||
|
|
@@ -913,6 +965,12 @@ async Task OnRunEntityBatchAsync( | |
| { | ||
| FailureDetails = new FailureDetails(frameworkException), | ||
| }; | ||
|
|
||
| traceActivities.ForEach(a => a.SetStatus(ActivityStatusCode.Error, frameworkException.Message)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again just a question for my own understanding - in this case and the case above, we will just end up with Activities with just server spans and no corresponding client span? |
||
| } | ||
| finally | ||
| { | ||
| traceActivities.ForEach(a => a.Dispose()); | ||
cgillum marked this conversation as resolved.
Dismissed
Show dismissed
Hide dismissed
|
||
| } | ||
|
|
||
| P.EntityBatchResult response = batchResult.ToEntityBatchResult( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: don't merge changes to this file until properly vendored (requires microsoft/durabletask-protobuf#64 to be merged).