Phew, it's been a while huh? Last time in part 3 we looked at how an activity gets scheduled and how its result gets handled. But the bit missing is how does the activity itself run? This is a relatively simple part of the framework.
Call chain for activity execution
With the Azure Storage durability provider, there is a work item queue where messages for activities go. So we should find a place in the code where a message is read from this queue. And indeed we do. The call chain looks like this:
WorkItemDispatcher<TaskActivityWorkItem>.DispatchAsync(context)(The call to StartAsync comes from TaskActivityDispatcher and then further in the framework, I won't bore you with that)TaskActivityDispatcher.OnFetchWorkItemAsync(receiveTimeout, cancellationToken)TaskActivityDispatcher.ProcessWorkItemAsync(workItem)INameVersionObjectManager<TaskActivity>.GetObject(name, version)(Finds the activity method)TaskActivity.RunAsync(context, input)(Executes your activity, puts output in astring?variable)- Sets a
TaskCompletedEventas the result (orTaskFailedEventif there was an error) AzureStorageOrchestrationService.CompleteTaskActivityWorkItemAsync(workItem, responseTaskMessage)AzureStorageOrchestrationService.GetControlQueueAsync(instanceId)(Locates right control queue with hash of instance ID)ControlQueue.AddMessageAsync(responseTaskMessage, session)WorkItemQueue.DeleteMessageAsync(session.MessageData, session)(Note the order here! First the response is sent, and then the triggering message is deleted. This way if the worker crashes after sending the result back, worst-case scenario is the activity runs again. If it was the other way, we would lose the response.)
How it works
WorkItemDispatcher and TaskActivityDispatcher are the two core components here.
They coordinate message retrieval, lock updates for messages etc.
There are some abstraction layers but ultimately a message is read from the work item queue.
As a reminder, all worker instances listen to the one work item queue and compete for messages.
The Storage queue message content is the TaskScheduled event that you can see in the history table.
The activity gets executed, and a result event is constructed depending if we succeeded or failed. So this will be either a TaskCompleted or a TaskFailed event.
This gets sent back to the control queue for the orchestration instance. The instance ID is part of the original TaskScheduled event, so we can calculate the hash to map the response to the right control queue. Then once that is done, the message that triggered the activity is deleted.
With other durability providers the only differences are the implementations for LockNextTaskActivityWorkItem and CompleteTaskActivityWorkItemAsync.
Summary
Compared to executing orchestrators, this is much simpler as expected. Not sure yet what the next part will be one, maybe we could look at Durable Entities?