1313
1414package io .dapr .durabletask ;
1515
16- import com .google .protobuf .StringValue ;
1716import io .dapr .durabletask .implementation .protobuf .OrchestratorService ;
18- import io .dapr .durabletask .implementation .protobuf .OrchestratorService .TaskFailureDetails ;
1917import io .dapr .durabletask .implementation .protobuf .TaskHubSidecarServiceGrpc ;
2018import io .dapr .durabletask .orchestration .TaskOrchestrationFactories ;
19+ import io .dapr .durabletask .runner .ActivityRunner ;
20+ import io .dapr .durabletask .runner .OrchestratorRunner ;
2121import io .grpc .Channel ;
2222import io .grpc .ManagedChannel ;
2323import io .grpc .ManagedChannelBuilder ;
2424import io .grpc .Status ;
2525import io .grpc .StatusRuntimeException ;
26- import org .apache .commons .lang3 .StringUtils ;
26+ import io .opentelemetry .api .GlobalOpenTelemetry ;
27+ import io .opentelemetry .api .trace .Tracer ;
28+ import io .opentelemetry .api .trace .propagation .W3CTraceContextPropagator ;
29+ import io .opentelemetry .context .Context ;
30+ import io .opentelemetry .context .propagation .TextMapGetter ;
2731
2832import java .time .Duration ;
2933import java .util .HashMap ;
3034import java .util .Iterator ;
35+ import java .util .Map ;
3136import java .util .concurrent .ExecutorService ;
3237import java .util .concurrent .Executors ;
3338import java .util .concurrent .TimeUnit ;
@@ -53,6 +58,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
5358 private final Duration maximumTimerInterval ;
5459 private final ExecutorService workerPool ;
5560 private final String appId ; // App ID for cross-app routing
61+ private final Tracer tracer ;
5662
5763 private final TaskHubSidecarServiceGrpc .TaskHubSidecarServiceBlockingStub sidecarClient ;
5864 private final boolean isExecutorServiceManaged ;
@@ -84,12 +90,20 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
8490 sidecarGrpcChannel = this .managedSidecarChannel ;
8591 }
8692
93+ this .tracer = GlobalOpenTelemetry .getTracer ("dapr-workflow" );
94+
8795 this .sidecarClient = TaskHubSidecarServiceGrpc .newBlockingStub (sidecarGrpcChannel );
8896 this .dataConverter = builder .dataConverter != null ? builder .dataConverter : new JacksonDataConverter ();
8997 this .maximumTimerInterval = builder .maximumTimerInterval != null ? builder .maximumTimerInterval
9098 : DEFAULT_MAXIMUM_TIMER_INTERVAL ;
91- this .workerPool = builder .executorService != null ? builder .executorService : Executors .newCachedThreadPool ();
99+
100+ ExecutorService rawExecutor = builder .executorService != null
101+ ? builder .executorService : Executors .newCachedThreadPool ();
102+ this .workerPool = Context .taskWrapping (rawExecutor );
103+
92104 this .isExecutorServiceManaged = builder .executorService == null ;
105+
106+
93107 }
94108
95109 /**
@@ -164,114 +178,25 @@ public void startAndBlock() {
164178 while (workItemStream .hasNext ()) {
165179 OrchestratorService .WorkItem workItem = workItemStream .next ();
166180 OrchestratorService .WorkItem .RequestCase requestType = workItem .getRequestCase ();
181+
167182 if (requestType == OrchestratorService .WorkItem .RequestCase .ORCHESTRATORREQUEST ) {
168183 OrchestratorService .OrchestratorRequest orchestratorRequest = workItem .getOrchestratorRequest ();
169-
170184 logger .log (Level .FINEST ,
171185 String .format ("Processing orchestrator request for instance: {0}" ,
172186 orchestratorRequest .getInstanceId ()));
173187
174- // TODO: Error handling
175- this .workerPool .submit (() -> {
176- TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor .execute (
177- orchestratorRequest .getPastEventsList (),
178- orchestratorRequest .getNewEventsList ());
179-
180- var versionBuilder = OrchestratorService .OrchestrationVersion .newBuilder ();
181-
182- if (StringUtils .isNotEmpty (taskOrchestratorResult .getVersion ())) {
183- versionBuilder .setName (taskOrchestratorResult .getVersion ());
184- }
185-
186- if (taskOrchestratorResult .getPatches () != null ) {
187- versionBuilder .addAllPatches (taskOrchestratorResult .getPatches ());
188- }
189-
190- OrchestratorService .OrchestratorResponse response = OrchestratorService .OrchestratorResponse .newBuilder ()
191- .setInstanceId (orchestratorRequest .getInstanceId ())
192- .addAllActions (taskOrchestratorResult .getActions ())
193- .setCustomStatus (StringValue .of (taskOrchestratorResult .getCustomStatus ()))
194- .setCompletionToken (workItem .getCompletionToken ())
195- .setVersion (versionBuilder )
196- .build ();
197-
198- try {
199- this .sidecarClient .completeOrchestratorTask (response );
200- logger .log (Level .FINEST ,
201- "Completed orchestrator request for instance: {0}" ,
202- orchestratorRequest .getInstanceId ());
203- } catch (StatusRuntimeException e ) {
204- if (e .getStatus ().getCode () == Status .Code .UNAVAILABLE ) {
205- logger .log (Level .WARNING ,
206- "The sidecar at address {0} is unavailable while completing the orchestrator task." ,
207- this .getSidecarAddress ());
208- } else if (e .getStatus ().getCode () == Status .Code .CANCELLED ) {
209- logger .log (Level .WARNING ,
210- "Durable Task worker has disconnected from {0} while completing the orchestrator task." ,
211- this .getSidecarAddress ());
212- } else {
213- logger .log (Level .WARNING ,
214- "Unexpected failure completing the orchestrator task at {0}." ,
215- this .getSidecarAddress ());
216- }
217- }
218- });
188+ this .workerPool .submit (new OrchestratorRunner (workItem , taskOrchestrationExecutor , sidecarClient , tracer ));
219189 } else if (requestType == OrchestratorService .WorkItem .RequestCase .ACTIVITYREQUEST ) {
220190 OrchestratorService .ActivityRequest activityRequest = workItem .getActivityRequest ();
221- logger .log (Level .FINEST ,
222- String .format ("Processing activity request: %s for instance: %s}" ,
223- activityRequest .getName (),
224- activityRequest .getOrchestrationInstance ().getInstanceId ()));
225-
226- // TODO: Error handling
227- this .workerPool .submit (() -> {
228- String output = null ;
229- TaskFailureDetails failureDetails = null ;
230- try {
231- output = taskActivityExecutor .execute (
191+
192+ logger .log (Level .INFO ,
193+ String .format ("Processing activity request: %s for instance: %s, gRPC thread context: %s" ,
232194 activityRequest .getName (),
233- activityRequest .getInput ().getValue (),
234- activityRequest .getTaskExecutionId (),
235- activityRequest .getTaskId ());
236- } catch (Throwable e ) {
237- failureDetails = TaskFailureDetails .newBuilder ()
238- .setErrorType (e .getClass ().getName ())
239- .setErrorMessage (e .getMessage ())
240- .setStackTrace (StringValue .of (FailureDetails .getFullStackTrace (e )))
241- .build ();
242- }
243-
244- OrchestratorService .ActivityResponse .Builder responseBuilder = OrchestratorService .ActivityResponse
245- .newBuilder ()
246- .setInstanceId (activityRequest .getOrchestrationInstance ().getInstanceId ())
247- .setTaskId (activityRequest .getTaskId ())
248- .setCompletionToken (workItem .getCompletionToken ());
249-
250- if (output != null ) {
251- responseBuilder .setResult (StringValue .of (output ));
252- }
253-
254- if (failureDetails != null ) {
255- responseBuilder .setFailureDetails (failureDetails );
256- }
257-
258- try {
259- this .sidecarClient .completeActivityTask (responseBuilder .build ());
260- } catch (StatusRuntimeException e ) {
261- if (e .getStatus ().getCode () == Status .Code .UNAVAILABLE ) {
262- logger .log (Level .WARNING ,
263- "The sidecar at address {0} is unavailable while completing the activity task." ,
264- this .getSidecarAddress ());
265- } else if (e .getStatus ().getCode () == Status .Code .CANCELLED ) {
266- logger .log (Level .WARNING ,
267- "Durable Task worker has disconnected from {0} while completing the activity task." ,
268- this .getSidecarAddress ());
269- } else {
270- logger .log (Level .WARNING , "Unexpected failure completing the activity task at {0}." ,
271- this .getSidecarAddress ());
272- }
273- }
274- });
195+ activityRequest .getOrchestrationInstance ().getInstanceId (),
196+ Context .current ()));
197+
198+ this .workerPool .submit (new ActivityRunner (workItem , taskActivityExecutor , sidecarClient , tracer ));
199+
275200 } else if (requestType == OrchestratorService .WorkItem .RequestCase .HEALTHPING ) {
276201 // No-op
277202 } else {
@@ -342,4 +267,57 @@ private void shutDownWorkerPool() {
342267 private String getSidecarAddress () {
343268 return this .sidecarClient .getChannel ().authority ();
344269 }
270+
271+ /**
272+ * Extracts trace context from the ActivityRequest's ParentTraceContext field
273+ * and creates an OpenTelemetry Context with the parent span set.
274+ *
275+ * @param activityRequest The activity request containing the parent trace context
276+ * @return A Context with the parent span set, or the current context if no trace context is present
277+ */
278+ private Context extractTraceContext (OrchestratorService .ActivityRequest activityRequest ) {
279+ if (!activityRequest .hasParentTraceContext ()) {
280+ logger .log (Level .FINE , "No parent trace context in activity request" );
281+ return Context .current ();
282+ }
283+
284+ OrchestratorService .TraceContext traceContext = activityRequest .getParentTraceContext ();
285+ String traceParent = traceContext .getTraceParent ();
286+
287+ if (traceParent .isEmpty ()) {
288+ logger .log (Level .FINE , "Empty traceparent in activity request" );
289+ return Context .current ();
290+ }
291+
292+ logger .log (Level .INFO ,
293+ String .format ("Extracting trace context from ActivityRequest: traceparent=%s" , traceParent ));
294+
295+ // Use W3CTraceContextPropagator to extract the trace context
296+ Map <String , String > carrier = new HashMap <>();
297+ carrier .put ("traceparent" , traceParent );
298+ if (traceContext .hasTraceState ()) {
299+ carrier .put ("tracestate" , traceContext .getTraceState ().getValue ());
300+ }
301+
302+ TextMapGetter <Map <String , String >> getter = new TextMapGetter <Map <String , String >>() {
303+ @ Override
304+ public Iterable <String > keys (Map <String , String > carrier ) {
305+ return carrier .keySet ();
306+ }
307+
308+ @ Override
309+ public String get (Map <String , String > carrier , String key ) {
310+ return carrier .get (key );
311+ }
312+ };
313+
314+
315+ Context extractedContext = W3CTraceContextPropagator .getInstance ()
316+ .extract (Context .current (), carrier , getter );
317+
318+ logger .log (Level .INFO ,
319+ String .format ("Extracted trace context: %s" , extractedContext ));
320+
321+ return extractedContext ;
322+ }
345323}
0 commit comments