In the first part we looked at what Durable Functions does during instance startup. Now that we have it up and running, what will happen we tell it to run an orchestration?
When you create a new Durable Function in Visual Studio, you get this line of code that starts the orchestration:
// starter is an IDurableClient
string instanceId = await starter.StartNewAsync("Function1", null);
This is what we will focus on today.
What actually happens when you call IDurableClient.StartNewAsync
?
Before we can get to that though, let's look at one more
thing that is done during startup that will become relevant later in the article.
Extension registration and DurableTask middleware
The DurableTaskExtension
class in the Durable Functions repository
implements the IExtensionConfigProvider
interface,
which defines an Initialize
method.
During startup, this method is called by the Functions host:
Now it does do quite a few other things too.
It registers the bindings to various types like IDurableClient
and the triggers like [OrchestrationTrigger]
.
But the middleware registration is the important bit today.
DispatchMiddlewarePipeline
has a list of functions it calls in sequence,
and the above process adds DurableTaskExtension.OrchestrationMiddleware
to that list.
We will get back to this method once we get close to running the orchestrator function code.
Queue message listener
Now since the durability provider in Durable Functions uses Storage Queues
for messaging,
we should find a loop somewhere that is polling for messages from those queues.
And we do, in OrchestrationSessionManager.DequeueLoop
.
But how do we get there?
In part 1, we saw that the AppLeaseManager
was started as part of instance startup.
If this instance was able to acquire the application lease,
it will have started its partition manager.
Part of the partition manager's duties includes acquiring an "ownership" lease for the control queues.
A callback is setup in AzureStorageOrchestrationService
that runs when this lease is acquired:
The callback creates a ControlQueue
and ensures it exists.
Then it adds the queue to the OrchestrationSessionManager
so that it can start listening for messages in the background through DequeueLoop
.
Sending the orchestration start message
Let's start the orchestration now, shall we? Here's the code responsible for doing that again:
// starter is an IDurableClient
string instanceId = await starter.StartNewAsync("Function1", null);
At a high level, this is what happens when StartNewAsync
is called:
Let's walk through that step by step.
The DurableClient
class does some validation and the call ends up in
TaskHubClient.InternalCreateOrchestrationInstanceWithRaisedEventAsync
.
An instance id is generated (Guid.NewGuid().ToString("N")
) if not specified.
The input object is serialized,
and then the method calls into AzureStorageOrchestrationService.CreateTaskOrchestrationAsync
.
So what does CreateTaskOrchestrationAsync
do?
First it ensures that the task hub exists through EnsureTaskHubAsync
.
Since this was already done during part 1,
this method does nothing this time.
Next DurableTask checks that there isn't another orchestration already in a conflicting state. Since you can specify the orchestration instance id manually, trying to start an orchestration with the same id when another is already running leads to an error here.
Then a message is sent to the control queue that will start the orchestration. The queue is chosen by passing the orchestration instance id through a Fowler-Noll-Vo (FNV-1a) hash function and getting a modulo with the partition count. In pseudocode that is something like this:
int queueIndex = Fnv1a(instanceId) % partitionCount;
This is to ensure good distribution among the partitions. Since each queue can have only one listener, it is critical for scaling that all queues get evenly used.
Finally a row containing the status is added to the instances table for the orchestration.
Receiving the message and preparing it for processing
Now that we have a message in a control queue,
we can return to OrchestrationSessionManager.DequeueLoop
and see what happens when it gets a message.
Small disclaimer: while this will look complicated, in reality it is actually more complicated. We are going to skip the parts where messages are added to existing sessions. Sessions in DurableTask's Azure Storage implementation are objects that contain for example the current state of an orchestration.
Normally when all messages have been processed for an orchestration, its session is removed from memory. As far as I understand there is only one scenario where messages would get added to an existing session: extended sessions are enabled and the session was still in memory. There might be other situations as well since everything is async though.
Since we are just starting the orchestration, we won't have a session in memory.
Let's look at what the simplified view of the dequeue loop looks like:
ControlQueue.GetMessagesAsync
returns us the message sent earlier.
A batch of messages is always received here (default/maximum 32 messages).
Next OrchestrationSessionManager.DedupeExecutionStartedMessagesAsync
ensures all messages have a corresponding row in the instances table
and puts the message back to the queue if the row does not exist.
It also erases redundant messages.
Since the queue message is sent before the instances table row is written,
a race condition is possible where the queue message is received first.
Due to this the message is abandoned and put back in the queue to be retried
if the row is still missing.
OrchestrationSessionManager.AddMessageToPendingOrchestration
organizes the received messages into batches
and kicks off a pre-fetch of the orchestration state from
the history table in the background.
Messages are batched by a instance id - execution id pair.
A short explanation of those ids:
- Instance id: stays the same across replays and ContinueAsNew executions
- Execution id: stays the same across replays but is different for ContinueAsNew executions
The same orchestration instance with another execution id is also called a "different generation" within DurableTask.
Once the state pre-fetch in OrchestrationSessionManager.ScheduleOrchestrationStatePrefetch
is done, it adds the message batch to the readyForProcessingQueue
.
Work item dispatcher
As part of instance startup,
a WorkItemDispatcher
is started for orchestration and activity work items (one for each).
Here is what happens to start getting messages from OrchestrationSessionManager
's
readyForProcessingQueue
and how they get handled:
Sorry for the complex diagram, TaskOrchestrationDispatcher and WorkItemDispatcher calling each other makes it a bit hard to draw.
WorkItemDispatcher.StartAsync
runs the DispatchAsync
call in
a fire-and-forget manner to start running it but not wait for it to finish.
DispatchAsync
starts a loop to get messages from the FetchWorkItem
callback.
This callback is linked to TaskOrchestrationDispatcher.OnFetchWorkItemAsync
,
through which the call gets to OrchestrationSessionManager.GetNextSessionAsync
.
Now we finally get to dequeue a pending message batch from
readyForProcessingQueue
.
A new session is created and returned.
Then AzureStorageOrchestrationService.LockNextTaskOrchestrationWorkItemAsync
creates and returns a TaskOrchestrationWorkItem
.
DispatchAsync
runs WorkItemDispatcher.ProcessWorkItemAsync
in a similar fire-and-forget way using Task.Run.
This leads to a chain of method calls that goes through DispatchMiddlewarePipeline
and FunctionExecutionHelper
to finally use ITriggeredFunctionExecutor.TryExecuteAsync
to call your orchestrator Function!
Summary
Quite a bit happens behind the scenes just to kick off an orchestration! At a high level, simplified:
IDurableClient.StartNewAsync
- Message sent to control queue + row added in instances table
- Message receive loop receives the message and organizes all received messages into batches, history pre-fetch started
- Message batches added to an in-memory queue
- The message batch is picked up by another receive loop
- Session and work item created
- Durable Functions extension runs the actual orchestration trigger Function
Next time we will look at how the orchestrator runs and queues up an activity to execute.