Last time in part 2,
we finished our deep dive at WorkItemDispatcher.ProcessWorkItemAsync
after processing a message from a control queue to start the orchestrator.
Now, let's see what happens behind the scenes to run an activity.
At a high level:
TaskOrchestrationDispatcher.ExecuteOrchestrationAsync
calls intoTaskOrchestrationExecutor.Execute
inside the dispatcher pipeline (source code)TaskOrchestrationExecutor
goes through all "past events" (events that have been played on a previous execution), followed by the "new events" (events that have not been played yet) (source code)- Before processing past events it sets
IsReplaying
to true on the context - After that
IsReplaying
is set to false and the new events are processed - You can see what is done for each event in the switch case block
- Before processing past events it sets
- The first event is an "OrchestratorStarted" event, its timestamp is used to set the
CurrentUtcDateTime
value on the orchestration context (source code) - The second event is an "ExecutionStarted" event, processing this event runs your orchestrator function (source code)
- Crucially, it does not await the Task returned so we can continue processing events
- When an activity is called through the orchestration context, a TaskCompletionSource is created, stored in the context and it gets awaited (source code)
- If event processing finishes with 0 "open tasks", the orchestration is complete (source code)
- An open task can refer to a sub-orchestrator start, activity start or timer start that didn't have a corresponding completed/failed event (i.e. the workflow is waiting for something)
- If there are open tasks, "decisions" are built from them and passed to the durability provider to continue the process (source code)
- Once the activity is done, the previous events will be replayed, and we will have a new TaskCompleted event, which is used to set the result for the TaskCompletionSource, which then causes the orchestration function to continue execution past the previous point (source code)
As an example for the event processing, we will use these orchestrator and activity functions:
[FunctionName("TestOrchestration")]
public static async Task TestOrchestration(
[OrchestrationTrigger] IDurableOrchestrationContext context,
ILogger logger)
{
var task = context.CallActivityAsync<string>("TestActivity", "Blog");
var result = await task;
logger.LogInformation($"Result: {result}");
}
[FunctionName("TestActivity")]
public static string TestActivity(
[ActivityTrigger] string input)
{
return $"Hello {input}!";
}
On first run, these are the processed events:
- Past events: none
- New events:
- Orchestrator started
- Execution started
There are no past events since this is the first run. As mentioned above, the execution started event causes the orchestrator to run. So we run the following code:
var task = context.CallActivityAsync<string>("TestActivity", "Blog");
It goes through a call chain that ends at TaskOrchestrationContext.ScheduleTaskInternal, that does a few things:
- Creates a
ScheduleTaskOrchestratorAction
and adds it toorchestratorActionsMap
- This is used to kick off the activity later
- Creates a
TaskCompletionSource<string>
and adds that along with some other data toopenTasks
- It being in openTasks at the end of event processing indicates that the orchestration is not complete
- Awaits the TaskCompletionSource's Task
- This results in the orchestrator function exiting with a pending Task
After that, there are no more events to process.
Since there is an open task, Durable Task knows the orchestration is not complete.
The ScheduleTaskOrchestratorAction
instructs Durable Task to schedule an activity.
In case you are curious, there are 5 possible actions:
public enum OrchestratorActionType
{
/// <summary>
/// A new task was scheduled by the orchestrator.
/// </summary>
ScheduleOrchestrator,
/// <summary>
/// A sub-orchestration was scheduled by the orchestrator.
/// </summary>
CreateSubOrchestration,
/// <summary>
/// A timer was scheduled by the orchestrator.
/// </summary>
CreateTimer,
/// <summary>
/// An outgoing external event was scheduled by the orchestrator.
/// </summary>
SendEvent,
/// <summary>
/// The orchestrator completed.
/// </summary>
OrchestrationComplete,
}
This action ultimately becomes a "decision" in Durable Task to schedule an activity.
This then ends up at the durability provider's CompleteTaskOrchestrationWorkItemAsync
method.
In the Azure Storage provider case, this sends a queue message to the work item queue (source code).
We will look at activity execution in more detail next time.
Once the activity completes, these are the processed events:
- Past events:
- Orchestrator started
- Execution started
- Task scheduled (activity start)
- Orchestrator completed
- New events:
- Orchestrator started
- Task completed (activity complete)
Note that we have events in past events that we did not technically play last time. They are just a record of things that were done as a result of the previous run.
Now we again process the execution started event which causes this to run again:
var task = context.CallActivityAsync<string>("TestActivity", "Blog");
This does the exact same thing it did previously. However, this time we have additional events to process.
The task scheduled event causes the entry in orchestratorActionsMap
to be removed (source code).
Removing the action means Durable Task won't kick off the activity again.
The orchestrator completed event does absolutely nothing here.
The second orchestrator started event's timestamp will reset the value in the orchestration context's CurrentUtcDateTime
property.
This is done so that the CurrentUtcDateTime
value is closer to the actual time while still being deterministic during replays.
The task completed event then sets the result on the TaskCompletionSource and removes the entry in openTasks
(source code).
Setting the result causes the orchestrator code to continue due to the default behaviour of Tasks/TaskCompletionSource.
Since continuations run synchronously by default, calling SetResult
runs the next part of the orchestrator inside it:
var result = await task;
logger.LogInformation($"Result: {result}");
Since there are no entries in openTasks
, the orchestration is complete.
There is an additional check for a failed execution in TaskOrchestrationExecutor.ExecuteCore
since if an exception was thrown, it could also be in a state where there are no open tasks.
In that case the orchestration would be marked as failed.
Summary
The part this time might be my favourite as it gets to the part of Durable Task that makes it tick. Hopefully this proves useful in understanding how Durable Task replays your code. Next time we will look shortly at how activities are executed.