In the first part we looked at what Durable Functions does during instance startup. Now that we have it up and running, what will happen we tell it to run an orchestration?

When you create a new Durable Function in Visual Studio, you get this line of code that starts the orchestration:

// starter is an IDurableClient
string instanceId = await starter.StartNewAsync("Function1", null);

This is what we will focus on today. What actually happens when you call IDurableClient.StartNewAsync? Before we can get to that though, let's look at one more thing that is done during startup that will become relevant later in the article.

Extension registration and DurableTask middleware

The DurableTaskExtension class in the Durable Functions repository implements the IExtensionConfigProvider interface, which defines an Initialize method. During startup, this method is called by the Functions host:

Durable Functions extension initialization sequence diagram

Now it does do quite a few other things too. It registers the bindings to various types like IDurableClient and the triggers like [OrchestrationTrigger]. But the middleware registration is the important bit today.

DispatchMiddlewarePipeline has a list of functions it calls in sequence, and the above process adds DurableTaskExtension.OrchestrationMiddleware to that list. We will get back to this method once we get close to running the orchestrator function code.

Queue message listener

Now since the durability provider in Durable Functions uses Storage Queues for messaging, we should find a loop somewhere that is polling for messages from those queues. And we do, in OrchestrationSessionManager.DequeueLoop.

But how do we get there? In part 1, we saw that the AppLeaseManager was started as part of instance startup. If this instance was able to acquire the application lease, it will have started its partition manager. Part of the partition manager's duties includes acquiring an "ownership" lease for the control queues.

A callback is setup in AzureStorageOrchestrationService that runs when this lease is acquired:

Starting to listen for control queue messages sequence diagram

The callback creates a ControlQueue and ensures it exists. Then it adds the queue to the OrchestrationSessionManager so that it can start listening for messages in the background through DequeueLoop.

Sending the orchestration start message

Let's start the orchestration now, shall we? Here's the code responsible for doing that again:

// starter is an IDurableClient
string instanceId = await starter.StartNewAsync("Function1", null);

At a high level, this is what happens when StartNewAsync is called:

StartNewAsync sequence diagram

Let's walk through that step by step. The DurableClient class does some validation and the call ends up in TaskHubClient.InternalCreateOrchestrationInstanceWithRaisedEventAsync.

An instance id is generated (Guid.NewGuid().ToString("N")) if not specified. The input object is serialized, and then the method calls into AzureStorageOrchestrationService.CreateTaskOrchestrationAsync.

So what does CreateTaskOrchestrationAsync do? First it ensures that the task hub exists through EnsureTaskHubAsync. Since this was already done during part 1, this method does nothing this time.

Next DurableTask checks that there isn't another orchestration already in a conflicting state. Since you can specify the orchestration instance id manually, trying to start an orchestration with the same id when another is already running leads to an error here.

Then a message is sent to the control queue that will start the orchestration. The queue is chosen by passing the orchestration instance id through a Fowler-Noll-Vo (FNV-1a) hash function and getting a modulo with the partition count. In pseudocode that is something like this:

int queueIndex = Fnv1a(instanceId) % partitionCount;

This is to ensure good distribution among the partitions. Since each queue can have only one listener, it is critical for scaling that all queues get evenly used.

Finally a row containing the status is added to the instances table for the orchestration.

Receiving the message and preparing it for processing

Now that we have a message in a control queue, we can return to OrchestrationSessionManager.DequeueLoop and see what happens when it gets a message.

Small disclaimer: while this will look complicated, in reality it is actually more complicated. We are going to skip the parts where messages are added to existing sessions. Sessions in DurableTask's Azure Storage implementation are objects that contain for example the current state of an orchestration.

Normally when all messages have been processed for an orchestration, its session is removed from memory. As far as I understand there is only one scenario where messages would get added to an existing session: extended sessions are enabled and the session was still in memory. There might be other situations as well since everything is async though.

Since we are just starting the orchestration, we won't have a session in memory.

Let's look at what the simplified view of the dequeue loop looks like:

Dequeue loop sequence diagram

ControlQueue.GetMessagesAsync returns us the message sent earlier. A batch of messages is always received here (default/maximum 32 messages). Next OrchestrationSessionManager.DedupeExecutionStartedMessagesAsync ensures all messages have a corresponding row in the instances table and puts the message back to the queue if the row does not exist. It also erases redundant messages. Since the queue message is sent before the instances table row is written, a race condition is possible where the queue message is received first. Due to this the message is abandoned and put back in the queue to be retried if the row is still missing.

OrchestrationSessionManager.AddMessageToPendingOrchestration organizes the received messages into batches and kicks off a pre-fetch of the orchestration state from the history table in the background. Messages are batched by a instance id - execution id pair. A short explanation of those ids:

  • Instance id: stays the same across replays and ContinueAsNew executions
  • Execution id: stays the same across replays but is different for ContinueAsNew executions

The same orchestration instance with another execution id is also called a "different generation" within DurableTask.

Once the state pre-fetch in OrchestrationSessionManager.ScheduleOrchestrationStatePrefetch is done, it adds the message batch to the readyForProcessingQueue.

Work item dispatcher

As part of instance startup, a WorkItemDispatcher is started for orchestration and activity work items (one for each). Here is what happens to start getting messages from OrchestrationSessionManager's readyForProcessingQueue and how they get handled:

Orchestrator Function execution sequence diagram

Sorry for the complex diagram, TaskOrchestrationDispatcher and WorkItemDispatcher calling each other makes it a bit hard to draw.

WorkItemDispatcher.StartAsync runs the DispatchAsync call in a fire-and-forget manner to start running it but not wait for it to finish. DispatchAsync starts a loop to get messages from the FetchWorkItem callback. This callback is linked to TaskOrchestrationDispatcher.OnFetchWorkItemAsync, through which the call gets to OrchestrationSessionManager.GetNextSessionAsync.

Now we finally get to dequeue a pending message batch from readyForProcessingQueue. A new session is created and returned. Then AzureStorageOrchestrationService.LockNextTaskOrchestrationWorkItemAsync creates and returns a TaskOrchestrationWorkItem.

DispatchAsync runs WorkItemDispatcher.ProcessWorkItemAsync in a similar fire-and-forget way using Task.Run. This leads to a chain of method calls that goes through DispatchMiddlewarePipeline and FunctionExecutionHelper to finally use ITriggeredFunctionExecutor.TryExecuteAsync to call your orchestrator Function!

Summary

Quite a bit happens behind the scenes just to kick off an orchestration! At a high level, simplified:

  1. IDurableClient.StartNewAsync
  2. Message sent to control queue + row added in instances table
  3. Message receive loop receives the message and organizes all received messages into batches, history pre-fetch started
  4. Message batches added to an in-memory queue
  5. The message batch is picked up by another receive loop
  6. Session and work item created
  7. Durable Functions extension runs the actual orchestration trigger Function

Next time we will look at how the orchestrator runs and queues up an activity to execute.