Durable Functions provides great model for running reliably serverless logic powered by easy to understand orchestration approach.
One of Durable Functions’ specifics is that follows async model. This means that you can fire request via HTTP, Queue or Manually and then you will get an endpoint to monitor the status of the execution.
However, I really like the Durable Functions’ orchestration capabilties and I want to use it as general approach for building complex workflows not limited only to async scenarios. But is it possible to combine the advanced orchestration capabilities of Durable Functions with more typical execution scenarios? Most of the clients we have today – mobile and API-s expect sync responses. Or in other words they are calling API-s endpoint and wait until there is a response. One approach to leverage the advanced orchestration capabilities of Durable Functions is to try to change existing clients and move them to async behavior. However, this is difficult task in short-term.
Why don’t we add sync layer on top of Durable Functions to enable more common scenarios and take advantage of their orchestration capabilities?
Usually with Durable Functions we have 3 main players – Orchestration Client, Orchestration Trigger (Orchestrator) and Activity Trigger (Activity):
In this architecture the Mobile Client needs to check periodically the status of Orchestrator and eventually will get the output.
Can we delegate this responsibility to an external layer – new function that will be called by the mobile client and provide the output if it accessible in pre-configured time-frame or provide the original status check endpoint if the allowed waiting period is over? Below you can see that we are adding Sync Response Wrapper function:
One sample implementation for Sync Response Wrapper can be found here:
using System; | |
using System.Net; | |
using System.Net.Http; | |
using System.Threading.Tasks; | |
using System.Configuration; | |
using DurableFunc.Model; | |
using DurableFunc.Services; | |
using DurableFunc.Utils; | |
using Microsoft.Azure.WebJobs; | |
using Microsoft.Azure.WebJobs.Extensions.Http; | |
using Microsoft.Azure.WebJobs.Host; | |
namespace DurableFunc | |
{ | |
public static class SyncResponse | |
{ | |
[FunctionName("SyncResponse")] | |
public static async Task<HttpResponseMessage> Run([HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)]HttpRequestMessage req, TraceWriter log) | |
{ | |
var functionName = await Helper.GetParameterValue(req); | |
if (string.IsNullOrEmpty(functionName)) | |
{ | |
req.CreateResponse(HttpStatusCode.BadRequest,"Please pass a funcname on the query string or in the request body"); | |
} | |
object result = null; | |
string statusUri; | |
var orchestrationClientUri = ConfigurationManager.AppSettings["OrchestrationClientUri"]; | |
using (var httpCleint = new HttpClient()) | |
{ | |
var orcestrationClientResponse = | |
await httpCleint.PostAsync(new Uri($"{orchestrationClientUri}{functionName}"), null); | |
orcestrationClientResponse.EnsureSuccessStatusCode(); | |
var clientResponse = | |
await orcestrationClientResponse.Content.ReadAsAsync<OrchestrationClientResponse>(); | |
statusUri = clientResponse.StatusQueryGetUri; | |
var executionDetails = Helper.GetExecutionDetails(); | |
var durableFunctionSyncResponseService = new DurableFunctionSyncResponseService(); | |
result = await durableFunctionSyncResponseService.ProvideOutput(clientResponse, executionDetails, log); | |
} | |
return result == null | |
? req.CreateResponse(HttpStatusCode.OK, $"The operation is taking more than expected. Keep following the progress here {statusUri}") : | |
req.CreateResponse(HttpStatusCode.OK, result); | |
} | |
} | |
} |
In Sync Response Wrapper function we call the Orchestration Client, after that we get the status endpoint and we start to check it for the output. The most interesting part is in the service that abstracts the call to the Orchestration Client Status Endpoint:
public async Task<object> ProvideOutput(OrchestrationClientResponse clientResponse, ExecutionDetails executionDetails, TraceWriter log) | |
{ | |
object result = null; | |
using (var httpClient = new HttpClient()) | |
{ | |
for (var i = 0; i < executionDetails.Iterations; i++) | |
{ | |
Thread.Sleep(executionDetails.IterationPeriod); | |
string statusCheck; | |
try | |
{ | |
statusCheck = await httpClient.GetStringAsync(clientResponse.StatusQueryGetUri); | |
} | |
catch (Exception e) | |
{ | |
// log the exception | |
log.Error(e.Message); | |
continue; | |
} | |
var status = JsonConvert.DeserializeObject<StatusResponse>(statusCheck); | |
if (status.RuntimeStatus != "Completed") { continue; } | |
result = status.Output; | |
break; | |
} | |
} | |
return result; | |
} |
We provide configuration that time boxes the waiting time and will fall back to the async behavior if we are not able to complete on time.
The next question is how we can optimize the performance. The disadvantage of this architecture is that we are adding one more component that will increase the overall latency. Can we eliminate some of the network calls?
Yes, let’s add the pulling logic that results in Sync Response in the Orchestration Client. This will help us to get immediately the Status URL endpoint and leverage Orchestration Client for getting the updates coming from the Orchestration Trigger:
Then the code of the Orchestration Client becomes:
[FunctionName("SyncResponseClient")] | |
public static async Task<HttpResponseMessage> Run( | |
[HttpTrigger(AuthorizationLevel.Function, methods: "post", Route = "orchestrator/{functionName}")] HttpRequestMessage req, | |
[OrchestrationClient] DurableOrchestrationClient starter, | |
string functionName, | |
TraceWriter log) | |
{ | |
// Function input comes from the request content. | |
dynamic eventData = await req.Content.ReadAsAsync<object>(); | |
string instanceId = await starter.StartNewAsync(functionName, eventData); | |
log.Info($"Started orchestration with ID = '{instanceId}'."); | |
var responseMessage = starter.CreateCheckStatusResponse(req, instanceId); | |
var clientResponse = JsonConvert.DeserializeObject<OrchestrationClientResponse>(await responseMessage.Content.ReadAsStringAsync()); | |
var executionDetails = Helper.GetExecutionDetails(); | |
var durableFunctionSyncResponseService = new DurableFunctionSyncResponseService(); | |
var result = await durableFunctionSyncResponseService.ProvideOutput(clientResponse, executionDetails, log); | |
return result == null | |
? req.CreateResponse(HttpStatusCode.OK, $"The operation is taking more than expected. Keep following the progress here {clientResponse.StatusQueryGetUri}") : | |
req.CreateResponse(HttpStatusCode.OK, result); | |
} |
We continue to leverage the same Service logic as well but we reduced the number of functions and calls.
Now let’s review complete example including sub-orchestration. We will call Sync Response Orchestration Client that will retrieve the names of 3 cities and via sub-orchestration will retrieve the current temperature in each of the cities:
Complete implementation of this sample can be found here – https://github.com/gled4er/durable-functions-sub-orchestrations
And in this case the first Orchestrator is calling both Activities and another Orchestrator:
using System.Collections.Generic; | |
using System.Threading.Tasks; | |
using Microsoft.Azure.WebJobs; | |
namespace DurableFunc | |
{ | |
public static class HelloWorld | |
{ | |
[FunctionName("HelloWorld")] | |
public static async Task<List<string>> Run([OrchestrationTrigger] DurableOrchestrationContext context) | |
{ | |
var list = new List<string> | |
{ | |
$"{await context.CallActivityAsync<string>("Hello", "Tokyo")}. The temperature is {await context.CallSubOrchestratorAsync<string>("TemperatureService", "Tokyo")}°C", | |
$"{await context.CallActivityAsync<string>("Hello", "Seattle")}. The temperature is {await context.CallSubOrchestratorAsync<string>("TemperatureService", "Seattle")}°C", | |
$"{await context.CallActivityAsync<string>("Hello", "London")}. The temperature is {await context.CallSubOrchestratorAsync<string>("TemperatureService", "London")}°C" | |
}; | |
return list; | |
} | |
} | |
} |
In terms of settings we have the following values for local testing:
{ | |
"IsEncrypted": false, | |
"Values": { | |
"AzureWebJobsStorage": "UseDevelopmentStorage=true", | |
"AzureWebJobsDashboard": "UseDevelopmentStorage=true", | |
"WeatherApiKey": {Weather-API-Key}, | |
"OrchestrationClientUri": "http://localhost:7071/api/orchestrators/", | |
"MaxExecutionTime": "3000", | |
"ExecutionPeriod": "100" | |
} | |
} |
And the result we get after execution is:
I am very interested to hear your opinion about this approach and if you find useful Durable Functions to be used for Sync Response scenarios. Please share your feedback by contacting me on @azurekanio
Thank you!
Kanio