Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ public void startAndBlock() {
OrchestratorService.WorkItem workItem = workItemStream.next();
OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase();

if (requestType == OrchestratorService.WorkItem.RequestCase.ORCHESTRATORREQUEST) {
OrchestratorService.OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();
if (requestType == OrchestratorService.WorkItem.RequestCase.WORKFLOWREQUEST) {
OrchestratorService.WorkflowRequest orchestratorRequest = workItem.getWorkflowRequest();
logger.log(Level.FINEST,
String.format("Processing orchestrator request for instance: {0}",
orchestratorRequest.getInstanceId()));
Expand All @@ -193,7 +193,7 @@ public void startAndBlock() {
logger.log(Level.INFO,
String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s",
activityRequest.getName(),
activityRequest.getOrchestrationInstance().getInstanceId(),
activityRequest.getWorkflowInstance().getInstanceId(),
Context.current()));

this.workerPool.submit(new ActivityRunner(workItem, taskActivityExecutor, sidecarClient, tracer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package io.dapr.durabletask;

import io.dapr.durabletask.implementation.protobuf.Orchestration.OrchestrationState;
import io.dapr.durabletask.implementation.protobuf.Orchestration.WorkflowState;
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;

import java.time.Instant;
Expand Down Expand Up @@ -45,19 +45,19 @@ public final class OrchestrationMetadata {
OrchestratorService.GetInstanceResponse fetchResponse,
DataConverter dataConverter,
boolean requestedInputsAndOutputs) {
this(fetchResponse.getOrchestrationState(), dataConverter, requestedInputsAndOutputs);
this(fetchResponse.getWorkflowState(), dataConverter, requestedInputsAndOutputs);
}

OrchestrationMetadata(
OrchestrationState state,
WorkflowState state,
DataConverter dataConverter,
boolean requestedInputsAndOutputs) {
this.dataConverter = dataConverter;
this.requestedInputsAndOutputs = requestedInputsAndOutputs;

this.name = state.getName();
this.instanceId = state.getInstanceId();
this.runtimeStatus = OrchestrationRuntimeStatus.fromProtobuf(state.getOrchestrationStatus());
this.runtimeStatus = OrchestrationRuntimeStatus.fromProtobuf(state.getWorkflowStatus());
this.createdAt = DataConverter.getInstantFromTimestamp(state.getCreatedTimestamp());
this.lastUpdatedAt = DataConverter.getInstantFromTimestamp(state.getLastUpdatedTimestamp());
this.serializedInput = state.getInput().getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ public static byte[] loadAndRun(byte[] orchestratorRequestBytes, TaskOrchestrati
throw new IllegalArgumentException("orchestration must not be null");
}

OrchestratorService.OrchestratorRequest orchestratorRequest;
OrchestratorService.WorkflowRequest orchestratorRequest;
try {
orchestratorRequest = OrchestratorService.OrchestratorRequest.parseFrom(orchestratorRequestBytes);
orchestratorRequest = OrchestratorService.WorkflowRequest.parseFrom(orchestratorRequestBytes);
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException("triggerStateProtoBytes was not valid protobuf", e);
}
Expand Down Expand Up @@ -170,7 +170,7 @@ public Boolean isLatestVersion() {
orchestratorRequest.getPastEventsList(),
orchestratorRequest.getNewEventsList());

OrchestratorService.OrchestratorResponse response = OrchestratorService.OrchestratorResponse.newBuilder()
OrchestratorService.WorkflowResponse response = OrchestratorService.WorkflowResponse.newBuilder()
.setInstanceId(orchestratorRequest.getInstanceId())
.addAllActions(taskOrchestratorResult.getActions())
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private class ContextImplTask implements TaskOrchestrationContext {
private String appId;

// LinkedHashMap to maintain insertion order when returning the list of pending actions
private final Map<Integer, OrchestratorActions.OrchestratorAction> pendingActions = new LinkedHashMap<>();
private final Map<Integer, OrchestratorActions.WorkflowAction> pendingActions = new LinkedHashMap<>();
private final Map<Integer, TaskRecord<?>> openTasks = new HashMap<>();
private final Map<String, Queue<TaskRecord<?>>> outstandingEvents = new LinkedHashMap<>();
private final List<HistoryEvents.HistoryEvent> unprocessedEvents = new LinkedList<>();
Expand Down Expand Up @@ -368,7 +368,7 @@ public <V> Task<V> callActivity(

TaskFactory<V> taskFactory = () -> {
int id = this.sequenceNumber++;
OrchestratorActions.OrchestratorAction.Builder actionBuilder = OrchestratorActions.OrchestratorAction
OrchestratorActions.WorkflowAction.Builder actionBuilder = OrchestratorActions.WorkflowAction
.newBuilder()
.setId(id)
.setScheduleTask(scheduleTaskBuilder);
Expand Down Expand Up @@ -463,16 +463,16 @@ public void sendEvent(String instanceId, String eventName, Object eventData) {

int id = this.sequenceNumber++;
String serializedEventData = this.dataConverter.serialize(eventData);
Orchestration.OrchestrationInstance.Builder orchestrationInstanceBuilder =
Orchestration.OrchestrationInstance.newBuilder()
Orchestration.WorkflowInstance.Builder orchestrationInstanceBuilder =
Orchestration.WorkflowInstance.newBuilder()
.setInstanceId(instanceId);
OrchestratorActions.SendEventAction.Builder builder = OrchestratorActions
.SendEventAction.newBuilder().setInstance(orchestrationInstanceBuilder)
.setName(eventName);
if (serializedEventData != null) {
builder.setData(StringValue.of(serializedEventData));
}
OrchestratorActions.OrchestratorAction.Builder actionBuilder = OrchestratorActions.OrchestratorAction.newBuilder()
OrchestratorActions.WorkflowAction.Builder actionBuilder = OrchestratorActions.WorkflowAction.newBuilder()
.setId(id)
.setSendEvent(builder);

Expand Down Expand Up @@ -505,8 +505,8 @@ public <V> Task<V> callSubOrchestrator(
}

String serializedInput = this.dataConverter.serialize(input);
OrchestratorActions.CreateSubOrchestrationAction.Builder createSubOrchestrationActionBuilder =
OrchestratorActions.CreateSubOrchestrationAction
OrchestratorActions.CreateChildWorkflowAction.Builder createSubOrchestrationActionBuilder =
OrchestratorActions.CreateChildWorkflowAction
.newBuilder().setName(name);
if (serializedInput != null) {
createSubOrchestrationActionBuilder.setInput(StringValue.of(serializedInput));
Expand Down Expand Up @@ -535,10 +535,10 @@ public <V> Task<V> callSubOrchestrator(

TaskFactory<V> taskFactory = () -> {
int id = this.sequenceNumber++;
OrchestratorActions.OrchestratorAction.Builder actionBuilder = OrchestratorActions.OrchestratorAction
OrchestratorActions.WorkflowAction.Builder actionBuilder = OrchestratorActions.WorkflowAction
.newBuilder()
.setId(id)
.setCreateSubOrchestration(createSubOrchestrationActionBuilder);
.setCreateChildWorkflow(createSubOrchestrationActionBuilder);

// Set router on the OrchestratorAction for cross-app routing
if (hasSourceAppId()) {
Expand Down Expand Up @@ -641,7 +641,7 @@ private void handleTaskScheduled(HistoryEvents.HistoryEvent e) {
// The history shows that this orchestrator created a durable task in a previous execution.
// We can therefore remove it from the map of pending actions. If we can't find the pending
// action, then we assume a non-deterministic code violation in the orchestrator.
OrchestratorActions.OrchestratorAction taskAction = this.pendingActions.remove(taskId);
OrchestratorActions.WorkflowAction taskAction = this.pendingActions.remove(taskId);
if (taskAction == null) {
String message = String.format(
"Non-deterministic orchestrator detected: a history event scheduling an activity task with sequence "
Expand Down Expand Up @@ -797,7 +797,7 @@ private Task<Void> createTimer(String name, Instant finalFireAt) {

private CompletableTask<Void> createInstantTimer(String name, int id, Instant fireAt) {
Timestamp ts = DataConverter.getTimestampFromInstant(fireAt);
this.pendingActions.put(id, OrchestratorActions.OrchestratorAction.newBuilder()
this.pendingActions.put(id, OrchestratorActions.WorkflowAction.newBuilder()
.setId(id)
.setCreateTimer(OrchestratorActions.CreateTimerAction.newBuilder()
.setName(name).setFireAt(ts))
Expand Down Expand Up @@ -825,7 +825,7 @@ private void handleTimerCreated(HistoryEvents.HistoryEvent e) {
// The history shows that this orchestrator created a durable timer in a previous execution.
// We can therefore remove it from the map of pending actions. If we can't find the pending
// action, then we assume a non-deterministic code violation in the orchestrator.
OrchestratorActions.OrchestratorAction timerAction = this.pendingActions.remove(timerEventId);
OrchestratorActions.WorkflowAction timerAction = this.pendingActions.remove(timerEventId);
if (timerAction == null) {
String message = String.format(
"Non-deterministic orchestrator detected: a history event creating a timer with ID %d and "
Expand Down Expand Up @@ -860,9 +860,9 @@ public void handleTimerFired(HistoryEvents.HistoryEvent e) {

private void handleSubOrchestrationCreated(HistoryEvents.HistoryEvent e) {
int taskId = e.getEventId();
HistoryEvents.SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreated =
e.getSubOrchestrationInstanceCreated();
OrchestratorActions.OrchestratorAction taskAction = this.pendingActions.remove(taskId);
HistoryEvents.ChildWorkflowInstanceCreatedEvent subOrchestrationInstanceCreated =
e.getChildWorkflowInstanceCreated();
OrchestratorActions.WorkflowAction taskAction = this.pendingActions.remove(taskId);
if (taskAction == null) {
String message = String.format(
"Non-deterministic orchestrator detected: a history event scheduling an sub-orchestration task "
Expand All @@ -876,8 +876,8 @@ private void handleSubOrchestrationCreated(HistoryEvents.HistoryEvent e) {
}

private void handleSubOrchestrationCompleted(HistoryEvents.HistoryEvent e) {
HistoryEvents.SubOrchestrationInstanceCompletedEvent subOrchestrationInstanceCompletedEvent =
e.getSubOrchestrationInstanceCompleted();
HistoryEvents.ChildWorkflowInstanceCompletedEvent subOrchestrationInstanceCompletedEvent =
e.getChildWorkflowInstanceCompleted();
int taskId = subOrchestrationInstanceCompletedEvent.getTaskScheduledId();
TaskRecord<?> record = this.openTasks.remove(taskId);
if (record == null) {
Expand Down Expand Up @@ -908,8 +908,8 @@ private void handleSubOrchestrationCompleted(HistoryEvents.HistoryEvent e) {
}

private void handleSubOrchestrationFailed(HistoryEvents.HistoryEvent e) {
HistoryEvents.SubOrchestrationInstanceFailedEvent subOrchestrationInstanceFailedEvent =
e.getSubOrchestrationInstanceFailed();
HistoryEvents.ChildWorkflowInstanceFailedEvent subOrchestrationInstanceFailedEvent =
e.getChildWorkflowInstanceFailed();
int taskId = subOrchestrationInstanceFailedEvent.getTaskScheduledId();
TaskRecord<?> record = this.openTasks.remove(taskId);
if (record == null) {
Expand Down Expand Up @@ -965,9 +965,9 @@ private void completeInternal(
Helpers.throwIfOrchestratorComplete(this.isComplete);


OrchestratorActions.CompleteOrchestrationAction.Builder builder = OrchestratorActions.CompleteOrchestrationAction
OrchestratorActions.CompleteWorkflowAction.Builder builder = OrchestratorActions.CompleteWorkflowAction
.newBuilder();
builder.setOrchestrationStatus(runtimeStatus);
builder.setWorkflowStatus(runtimeStatus);

if (rawOutput != null) {
builder.setResult(StringValue.of(rawOutput));
Expand All @@ -986,10 +986,10 @@ private void completeInternal(
}

int id = this.sequenceNumber++;
OrchestratorActions.OrchestratorAction.Builder actionBuilder = OrchestratorActions.OrchestratorAction
OrchestratorActions.WorkflowAction.Builder actionBuilder = OrchestratorActions.WorkflowAction
.newBuilder()
.setId(id)
.setCompleteOrchestration(builder.build());
.setCompleteWorkflow(builder.build());

// Add router to completion action for cross-app routing back to parent
if (hasSourceAppId()) {
Expand All @@ -1003,7 +1003,7 @@ private void completeInternal(
this.isComplete = true;
}

private void addCarryoverEvents(OrchestratorActions.CompleteOrchestrationAction.Builder builder) {
private void addCarryoverEvents(OrchestratorActions.CompleteWorkflowAction.Builder builder) {
// Add historyEvent in the unprocessedEvents buffer
// Add historyEvent in the new event list that haven't been added to the buffer.
// We don't check the event in the pass event list to avoid duplicated events.
Expand Down Expand Up @@ -1040,28 +1040,28 @@ private void processEvent(HistoryEvents.HistoryEvent e) {
} else {
this.logger.fine(() -> this.instanceId + ": Processing event: " + e.getEventTypeCase());
switch (e.getEventTypeCase()) {
case ORCHESTRATORSTARTED:
case WORKFLOWSTARTED:
Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp());
this.setCurrentInstant(instant);

if (StringUtils.isNotEmpty(e.getOrchestratorStarted().getVersion().getName())) {
this.orchestratorVersionName = e.getOrchestratorStarted().getVersion().getName();
if (StringUtils.isNotEmpty(e.getWorkflowStarted().getVersion().getName())) {
this.orchestratorVersionName = e.getWorkflowStarted().getVersion().getName();
}
for (var patch : e.getOrchestratorStarted().getVersion().getPatchesList()) {
for (var patch : e.getWorkflowStarted().getVersion().getPatchesList()) {
this.historyPatches.put(patch, true);
}

this.logger.fine(() -> this.instanceId + ": Workflow orchestrator started");
break;
case ORCHESTRATORCOMPLETED:
case WORKFLOWCOMPLETED:
// No action needed
this.logger.fine(() -> this.instanceId + ": Workflow orchestrator completed");
break;
case EXECUTIONSTARTED:
HistoryEvents.ExecutionStartedEvent executionStarted = e.getExecutionStarted();
this.setName(executionStarted.getName());
this.setInput(executionStarted.getInput().getValue());
this.setInstanceId(executionStarted.getOrchestrationInstance().getInstanceId());
this.setInstanceId(executionStarted.getWorkflowInstance().getInstanceId());
this.logger.fine(() -> this.instanceId + ": Workflow execution started");
// For cross-app suborchestrations, if the router has a target, use that as our appID
// since that's where we're actually executing
Expand Down Expand Up @@ -1122,13 +1122,13 @@ private void processEvent(HistoryEvents.HistoryEvent e) {
case TIMERFIRED:
this.handleTimerFired(e);
break;
case SUBORCHESTRATIONINSTANCECREATED:
case CHILDWORKFLOWINSTANCECREATED:
this.handleSubOrchestrationCreated(e);
break;
case SUBORCHESTRATIONINSTANCECOMPLETED:
case CHILDWORKFLOWINSTANCECOMPLETED:
this.handleSubOrchestrationCompleted(e);
break;
case SUBORCHESTRATIONINSTANCEFAILED:
case CHILDWORKFLOWINSTANCEFAILED:
this.handleSubOrchestrationFailed(e);
break;
case EVENTRAISED:
Expand All @@ -1149,14 +1149,14 @@ private void processEvent(HistoryEvents.HistoryEvent e) {
public void setVersionNotRegistered() {
this.pendingActions.clear();

OrchestratorActions.CompleteOrchestrationAction.Builder builder = OrchestratorActions.CompleteOrchestrationAction
OrchestratorActions.CompleteWorkflowAction.Builder builder = OrchestratorActions.CompleteWorkflowAction
.newBuilder();
builder.setOrchestrationStatus(Orchestration.OrchestrationStatus.ORCHESTRATION_STATUS_STALLED);
builder.setWorkflowStatus(Orchestration.OrchestrationStatus.ORCHESTRATION_STATUS_STALLED);

int id = this.sequenceNumber++;
OrchestratorActions.OrchestratorAction action = OrchestratorActions.OrchestratorAction.newBuilder()
OrchestratorActions.WorkflowAction action = OrchestratorActions.WorkflowAction.newBuilder()
.setId(id)
.setCompleteOrchestration(builder.build())
.setCompleteWorkflow(builder.build())
.build();
this.pendingActions.put(id, action);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

public final class TaskOrchestratorResult {

private final Collection<OrchestratorActions.OrchestratorAction> actions;
private final Collection<OrchestratorActions.WorkflowAction> actions;

private final String customStatus;

Expand All @@ -37,15 +37,15 @@ public final class TaskOrchestratorResult {
* @param version the orchestrator version
* @param patches the patches to apply
*/
public TaskOrchestratorResult(Collection<OrchestratorActions.OrchestratorAction> actions,
public TaskOrchestratorResult(Collection<OrchestratorActions.WorkflowAction> actions,
String customStatus, String version, List<String> patches) {
this.actions = Collections.unmodifiableCollection(actions);
this.customStatus = customStatus;
this.version = version;
this.patches = patches;
}

public Collection<OrchestratorActions.OrchestratorAction> getActions() {
public Collection<OrchestratorActions.WorkflowAction> getActions() {
return this.actions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private void runWithTracing() {
.setParent(parentContext)
.setSpanKind(SpanKind.INTERNAL)
.setAttribute("durabletask.task.instance_id",
activityRequest.getOrchestrationInstance().getInstanceId())
activityRequest.getWorkflowInstance().getInstanceId())
.setAttribute("durabletask.task.id", activityRequest.getTaskId())
.setAttribute("durabletask.activity.name", activityRequest.getName())
.startSpan();
Expand Down Expand Up @@ -123,7 +123,7 @@ private void executeActivity() throws Throwable {

OrchestratorService.ActivityResponse.Builder responseBuilder = OrchestratorService.ActivityResponse
.newBuilder()
.setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
.setInstanceId(activityRequest.getWorkflowInstance().getInstanceId())
.setTaskId(activityRequest.getTaskId())
.setCompletionToken(workItem.getCompletionToken());

Expand Down
Loading
Loading