Durable Functions allow you to run work in parallel and then do something once all those have completed. This is called the fan-out/fan-in scenario. Some time ago I was faced with an issue of tracking the status of these individual activities. Could we somehow report which activities have completed and which ones have not?
One approach I've used in the past is to update a flag in a database at the end of the sub-orchestrator or activity function. But what if we don't want to use a database for this? Doesn't Durable Functions know which ones have completed? If we can somehow access this information, we could report it in the orchestrator's custom status that can then be queried through the Durable Functions REST API.
In this article we will walk through the approach I used for this. If you just want the complete code, it'll be close to the end of the article.
Approaches that did not work
At first I thought I could maybe check the Task
objects' Status
property.
But the status will always be Created
until the task is awaited.
And if we use the typical approach of await Task.WhenAll(tasks);
,
we won't be able to track individual activities/sub-orchestrators since by the time that we
continue execution past the await, all of the tasks are already completed.
Then I thought that I could just await the Tasks individually in another loop, something like:
var tasks = new List<Task>();
for (int i = 0; i < 3; i++)
{
tasks.Add(context.CallActivityAsync("Activity", ""));
}
foreach (var task in tasks)
{
await task;
// Update status
}
But now if the second task completes before the first, we won't know until the first task has completed. The second loop will wait until the first task completes before even checking the second one. So this doesn't really work either.
The approach I used
After some thought I came up with the following approach:
- Use
await Task.WhenAny(unfinishedTasks)
to wait for any of the tasks to finish - Find the task that finished and remove it from the unfinished tasks collection
- Update orchestrator status
- Repeat until all the tasks are completed
So let's get building! We will kick off a set of activities to run in parallel in our orchestrator:
const int TaskCount = 5;
var tasks = new List<Task>(TaskCount);
for (int i = 0; i < TaskCount; i++)
{
var task = context.CallActivityAsync("Function1_Test", "");
tasks.Add(task);
}
Each of these activity functions will take 10-20 seconds to execute and will also fail around 10% of the time:
[FunctionName("Function1_Test")]
public static async Task Test(
[ActivityTrigger] IDurableActivityContext context, ILogger log)
{
var delay = 10_000 + _rand.Next(10_000);
log.LogInformation("Activity running with {Delay} ms delay", delay);
await Task.Delay(delay);
// Fail around 10% of the time
if (_rand.NextDouble() > 0.9)
{
log.LogWarning("Random activity failure");
throw new Exception("Random activity failure");
}
}
This should allow us to see the statuses update out-of-order and also see that we are tracking failures correctly.
I defined an enum to represent the different states of the activities:
public enum ActivityStatus
{
Started,
Completed,
Failed
}
The status of an activity will be Started at first, and will eventually be either Completed or Failed.
We can start by setting the orchestrator status to its starting value:
var activityStatuses = new ActivityStatus[tasks.Count];
context.SetCustomStatus(activityStatuses.Select(s => s.ToString()));
One thing to note is that we set the orchestrator's custom status to an array of statuses here since we only need to track this one process running in parallel. If you need to put more data in there than just the status of a single set of tasks, then you need to use a data structure that allows you to store all of that information.
Then we can start the loop where we wait for tasks to complete (code slightly simplified from final version with added comments):
var doneActivityCount = 0;
while (doneActivityCount < tasks.Count)
{
// Get the tasks that are not done
var notDoneTasks = tasks.Where((t, i) => activityStatuses[i] == ActivityStatus.Started);
// Wait for one of the tasks to complete
var doneTask = await Task.WhenAny(notDoneTasks);
// Find which one completed
var doneTaskIndex = tasks.FindIndex(t => ReferenceEquals(t, doneTask));
// Update status for this task and increment done count
activityStatuses[doneTaskIndex] = GetActivityStatusFromTask(doneTask);
doneActivityCount++;
if (!context.IsReplaying)
{
// Only update status when not replaying
context.SetCustomStatus(activityStatuses.Select(s => s.ToString()));
}
}
The comments should help you understand what is happening.
At the core of it is that we use Task.WhenAny
on the tasks that have not yet completed.
Of note is that WhenAny will return the Task that completed regardless if it failed or succeeded.
Also note that we only update the status if we are not replaying. I'm actually not sure if the old values would become visible through the API, but checking the implementation, it will serialize the object given regardless if we are replaying or not. And that's a bit unnecessary to do for the past statuses.
We utilize a small utility function to map the Task's Status
to our enum:
[Deterministic]
private static ActivityStatus GetActivityStatusFromTask(Task task)
{
return task.Status switch
{
TaskStatus.Created => ActivityStatus.Started,
TaskStatus.WaitingForActivation => ActivityStatus.Started,
TaskStatus.WaitingToRun => ActivityStatus.Started,
TaskStatus.Running => ActivityStatus.Started,
TaskStatus.WaitingForChildrenToComplete => ActivityStatus.Started,
TaskStatus.RanToCompletion => ActivityStatus.Completed,
TaskStatus.Canceled => ActivityStatus.Failed,
TaskStatus.Faulted => ActivityStatus.Failed,
_ => throw new NotImplementedException(),
};
}
In practice the only statuses we will see (at least for activities) are Created, RanToCompletion, and Faulted. But this function implements the rest of them as well for the sake of completeness.
At this point we are basically done! We can put the looping logic in a helper function:
[Deterministic]
private static async Task WhenAllWithStatusUpdate(
IDurableOrchestrationContext context,
ILogger logger,
List<Task> tasks)
{
var activityStatuses = new ActivityStatus[tasks.Count];
context.SetCustomStatus(activityStatuses.Select(s => s.ToString()));
var doneActivityCount = 0;
while (doneActivityCount < tasks.Count)
{
// Wait for one of the not done tasks to complete
var notDoneTasks = tasks.Where((t, i) => activityStatuses[i] == ActivityStatus.Started);
var doneTask = await Task.WhenAny(notDoneTasks);
// Find which one completed
var doneTaskIndex = tasks.FindIndex(t => ReferenceEquals(t, doneTask));
// Sanity check
if (doneTaskIndex < 0 || activityStatuses[doneTaskIndex] != ActivityStatus.Started)
{
throw new Exception(
"Something went wrong, completed task not found or it was already completed");
}
activityStatuses[doneTaskIndex] = GetActivityStatusFromTask(doneTask);
doneActivityCount++;
if (!context.IsReplaying)
{
// Only log and update status when not replaying
logger.LogInformation(
"Task {Index} completed, status {Status}, {Count} tasks done",
doneTaskIndex,
activityStatuses[doneTaskIndex],
doneActivityCount);
context.SetCustomStatus(activityStatuses.Select(s => s.ToString()));
}
}
var failedTasks = tasks.Where(t => t.Exception != null).ToList();
if (failedTasks.Count > 0)
{
throw new AggregateException(
"One or more operations failed", failedTasks.Select(t => t.Exception));
}
}
Similar to Task.WhenAll()
,
the function throws an exception at the end if one or more tasks failed.
Then we can use it in the orchestrator:
[FunctionName("Function1")]
public static async Task RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context,
ILogger logger)
{
const int TaskCount = 5;
var tasks = new List<Task>(TaskCount);
for (int i = 0; i < TaskCount; i++)
{
var task = context.CallActivityAsync("Function1_Test", "");
tasks.Add(task);
}
await WhenAllWithStatusUpdate(context, logger, tasks);
}
If we now get the instance status while it is running, we can see the status of individual tasks:
{
"customStatus": ["Completed", "Completed", "Started", "Started", "Completed"]
}
If one of the tasks fails, we will see something like this:
{
"customStatus": [
"Completed",
"Completed",
"Completed",
"Completed",
"Failed"
],
"output": "Orchestrator function 'Function1' failed: One or more operations failed (One or more errors occurred. (The activity function 'Function1_Test' failed: \"Random activity failure\". See the function execution logs for additional details.))"
}
Completed functions app
Here is the complete source code of the Durable Function for reference:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
namespace DurableFunctionsMonitorProcess
{
public static class Function1
{
private static readonly Random _rand = new Random();
[FunctionName("Function1")]
public static async Task RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context,
ILogger logger)
{
const int TaskCount = 5;
var tasks = new List<Task>(TaskCount);
for (int i = 0; i < TaskCount; i++)
{
var task = context.CallActivityAsync("Function1_Test", "");
tasks.Add(task);
}
await WhenAllWithStatusUpdate(context, logger, tasks);
}
[Deterministic]
private static async Task WhenAllWithStatusUpdate(
IDurableOrchestrationContext context,
ILogger logger,
List<Task> tasks)
{
var activityStatuses = new ActivityStatus[tasks.Count];
var doneActivityCount = 0;
context.SetCustomStatus(activityStatuses.Select(s => s.ToString()));
while (doneActivityCount < tasks.Count)
{
// Wait for one of the not done tasks to complete
var notDoneTasks = tasks.Where((t, i) => activityStatuses[i] == ActivityStatus.Started);
var doneTask = await Task.WhenAny(notDoneTasks);
// Find which one completed
var doneTaskIndex = tasks.FindIndex(t => ReferenceEquals(t, doneTask));
// Sanity check
if (doneTaskIndex < 0 || activityStatuses[doneTaskIndex] != ActivityStatus.Started)
{
throw new Exception(
"Something went wrong, completed task not found or it was already completed");
}
activityStatuses[doneTaskIndex] = GetActivityStatusFromTask(doneTask);
doneActivityCount++;
if (!context.IsReplaying)
{
// Only log and update status when not replaying
logger.LogInformation(
"Task {Index} completed, status {Status}, {Count} tasks done",
doneTaskIndex,
activityStatuses[doneTaskIndex],
doneActivityCount);
context.SetCustomStatus(activityStatuses.Select(s => s.ToString()));
}
}
var failedTasks = tasks.Where(t => t.Exception != null).ToList();
if (failedTasks.Count > 0)
{
throw new AggregateException(
"One or more operations failed", failedTasks.Select(t => t.Exception));
}
}
[Deterministic]
private static ActivityStatus GetActivityStatusFromTask(Task task)
{
return task.Status switch
{
TaskStatus.Created => ActivityStatus.Started,
TaskStatus.WaitingForActivation => ActivityStatus.Started,
TaskStatus.WaitingToRun => ActivityStatus.Started,
TaskStatus.Running => ActivityStatus.Started,
TaskStatus.WaitingForChildrenToComplete => ActivityStatus.Started,
TaskStatus.RanToCompletion => ActivityStatus.Completed,
TaskStatus.Canceled => ActivityStatus.Failed,
TaskStatus.Faulted => ActivityStatus.Failed,
_ => throw new NotImplementedException(),
};
}
[FunctionName("Function1_Test")]
public static async Task Test(
[ActivityTrigger] IDurableActivityContext context, ILogger log)
{
var delay = 10_000 + _rand.Next(10_000);
log.LogInformation("Activity running with {Delay} ms delay", delay);
await Task.Delay(delay);
// Fail around 10% of the time
if (_rand.NextDouble() > 0.9)
{
log.LogWarning("Random activity failure");
throw new Exception("Random activity failure");
}
}
[FunctionName("Function1_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
// Function input comes from the request content.
string instanceId = await starter.StartNewAsync("Function1", null);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
return starter.CreateCheckStatusResponse(req, instanceId);
}
}
}
public enum ActivityStatus
{
Started,
Completed,
Failed
}
Durable Functions Monitor
As an alternative, you can also use the Durable Functions Monitor tool. It can also tell you which activities succeeded, how long they took, and a bunch of other really fancy features. For example, here is what the instance details view looks like in the VS Code extension:
This tool works great if a person wants to see the process. The approach above on the other hand works for applications that need to access this information.
Summary
The TL;DR is that you can use await Task.WhenAny()
on the set of unfinished tasks,
find the returned task from the list of tasks to know which one finished,
and then update the status accordingly.
This approach should work similarly for activities, sub-orchestrators, events etc.
If you want a graphical tool for showing you this info and more, I highly recommend checking Durable Functions Monitor. The VS Code extension version is really easy to install and use.
Hope this was useful for you! There are definitely other approaches to this. You could for example read the activity states from the history table in Azure Storage, similar to what Durable Functions Monitor does. If you think of any other approaches, do let me know in the comments or on Twitter.
Until next time!