In a recent article, I showed how to create a Durable Azure Function. If you are unfamiliar with Durable Functions, I recommend you read that article first.
In that article, the Durable Function called 3 Activity Functions in sequence. No Function executed until the Function before it completed. Sometimes, it is important that Functions execute in a certain order. But sometimes it does not matter in which order a Function executes - only that they each complete successfully before another Activity Function is called. In these cases, executing sequentially is a waste of time. It is more efficient to execute these Azure Functions in parallel.
In this article, I will show how to create a durable function that executes three Activity Functions in parallel; then waits for all 3 to complete before executing a fourth function.
Fig. 1 illustrates this pattern.
Fig. 1
As we noted in the earlier article, a Durable function is triggered by a starter function, which is in turn triggered by an HTTP request, database change, timer, or any of the many triggers supported by Azure Functions, as shown in Fig. 2.
I created 4 Activity Functions that do nothing more than write a couple messages to the log (I use LogWarning, because it causes the text to display in yellow, making it easier to find); delay a few seconds (to simulate a long-running task); and return a string consisting of the input string, concatenated with the name of the current function. The functions are nearly identical: Only the Function Name, the message, and the length of delay are different.
The 4 functions are shown below:
public static class Function1 { [FunctionName("Function1")] public static async Task<string> Run( [ActivityTrigger] string msg, ILogger log) { log.LogWarning("This is Function 1"); await Task.Delay(15000); log.LogWarning("Function 1 completed"); msg += "Function 1"; return msg; } }
Listing 1
public static class Function2 { [FunctionName("Function2")] public static async Task<string> Run( [ActivityTrigger] string msg, ILogger log) { log.LogWarning("This is Function 2"); await Task.Delay(10000); log.LogWarning("Function 2 completed"); msg += "Function 2"; return msg; } }
Listing 2
public static class Function3 { [FunctionName("Function3")] public static async Task<string> Run( [ActivityTrigger] string msg, ILogger log) { log.LogWarning("This is Function 3"); await Task.Delay(5000); log.LogWarning("Function 3 completed"); msg += "Function 3"; return msg; } }
Listing 3
public static class Function4 { [FunctionName("Function4")] public static async Task<string> Run( [ActivityTrigger] string msg, ILogger log) { log.LogWarning("This is Function 4"); int secondsDelay = new Random().Next(8, 12); await Task.Delay(1000); log.LogInformation("Function 4 completed"); msg += "\n\rFunction 4"; return msg; } }
Listing 4
We use the Parallel Task library to launch the first 3 functions and have them run in parallel; then, wait until each of the first 3 complete before executing the 4th Activity Function.
Listing 5 shows this code in our Durable Orchestration function.
public static class DurableFunction1 { [FunctionName("DurableFunction1")] public static async TaskRun( [OrchestrationTrigger] DurableOrchestrationContext ctx, ILogger log) { var msg = "Durable Function: "; var parallelTasks = new List string>>(); Task<string> task1 = ctx.CallActivityAsync<string>("Function1", msg); parallelTasks.Add(task1); Task<string> task2 = ctx.CallActivityAsync<string>("Function2", msg); parallelTasks.Add(task2); Task<string> task3 = ctx.CallActivityAsync<string>("Function3", msg); parallelTasks.Add(task3); await Task.WhenAll(parallelTasks); // All 3 Activity functions finished msg = task1.Result + "\n\r" + task2.Result + "\n\r" + task3.Result; // Use LogWarning, so it shows up in Yellow, making it easier to spot log.LogWarning($"All 3 Activity functions completed for orchestration {ctx.InstanceId}!"); msg = await ctx.CallActivityAsync<string>("Function4", msg); log.LogWarning(msg); return new OkObjectResult(msg); } }
Listing 5
We create a new List of Tasks and add each activity to that list:
var msg = "Durable Function: ";
var parallelTasks = new Liststring>>();
Task<string> task1 = ctx.CallActivityAsync<string>("Function1", msg);
parallelTasks.Add(task1);
Task<string> task2 = ctx.CallActivityAsync<string>("Function2", msg);
parallelTasks.Add(task2);
Task<string> task3 = ctx.CallActivityAsync<string>("Function3", msg);
parallelTasks.Add(task3);
The following line tells the system to wait until all 3 tasks in that list are completed.
await Task.WhenAll(parallelTasks);
When all 3 tasks complete, we resume the program flow, calling the 4th Activity and logging the output:
log.LogWarning($"All 3 Activity functions completed for orchestration {ctx.InstanceId}!");
msg = await ctx.CallActivityAsync<string>("Function4", msg);
log.LogWarning(msg);
As in the previous article, we launch this Durable Orchestration Function with a starter function (in this case a function with an HTTP trigger), as shown in Listing 6 below.
public static class StarterFunction1 { [FunctionName("StarterFunction1")] public static async TaskRun( [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequestMessage req, [OrchestrationClient] DurableOrchestrationClient starter, TraceWriter log) { log.Info("About to start orchestration"); var orchestrationId = await starter.StartNewAsync("DurableFunction1", log); return starter.CreateCheckStatusResponse(req, orchestrationId); } }
Testing the Orchestration
We can test this orchestration by running the solution, which displays the HTTP Trigger URL, as shown in Fig. 3
We can then open a browser, type the HTTP Trigger URL in the address bar, and press [ENTER] to trigger the function, as shown in Fig. 4
Switch back to the function output to view the messages as they scroll past. You should see output from each of the first 3 functions (although not necessarily in the order called), followed by a message indicating the first 3 are complete; then output from Function 4. This is shown in Fig. 5.
You can view this project under “Durable Functions” in this GitHub repository.
In this article, I showed how to create a Durable Orchestration Function that launches activity functions that run in parallel.