azure-functions-durable-extension: Orchestrator stuck when processing High volume data (with source code to reproduce it)

Description

Orchestrator gets stuck when processing high volume data.

I hit the issue in a production environment, but I´ve been able to reproduce the issue with a “minimal” project that you can find in this GitHub repo: https://github.com/luismanez/high-volume-data-durable-function-test

See Relevant source code snippets section and the source code in the repo for more information.

This has been tested in an EP1 service plan scaled out to 20 instances in the Maximum burst option, but also happens with just 2 instances.

Thank you for your help, and happy to provide any other information and change the architecture to whatever is recommended by the Durable functions team in order to deal with these kind of scenarios.

Expected behavior

The sub orchestrator completes, and it returns control to the Orchestrator, to continue its workflow

Actual behavior

The sub orchestrator stucks “running”. Sometimes, the sub orchestrator completes, but the main orchestrator stucks “running”.

Relevant source code snippets

First, we have a Timer trigger that fires every 20 minutes. The trigger starts the main orchestrator, but follows the Singleton pattern (copy code from MS official docs), so it first checks if the Orchestration ID (a constant) is still running. if so, does nothing, otherwise, the main orchestrator starts.

This is the main orchestrator source code. You can get more info in source code comments

// This sub-orchestrator just simulates the creation of a very long list of Items (models/SyncWorkspaceDto.cs)
var workspacesToSync = await context.CallGetWorkspacesToSyncSubOrchestrator();
log.LogInformation($"{nameof(WorkspacesSynchronisationOrchestrator)} got workspaces to process");

// This sub-orchestrator iterates the very-long list following fan-out/in pattern. 
// For each element, calls an Activity that simulates calling an external API to get more info about that specific item
// Then, the list is iterated again (applying some filter with the data obtained in previous foreach)
// and for each element, call an Activity that simulates calling another external API to get more info about the item
workspacesToSync = await context.CallGetWorkspacesDetailedInfoSubOrchestrator(workspacesToSync);
log.LogInformation($"{nameof(WorkspacesSynchronisationOrchestrator)} got detailed info for all workspaces to process");

// fan-out/in to Persist Workspaces to DB.
// Foreach item, it calls another Activity that simulates storing the item info in SQL and Azure Search
var workspacesToPersist = workspacesToSync; // in real project, we apply some filter here
var workspacesToPersistCount = workspacesToPersist.Count();
var workspacesToPersistTasks = new Task<bool>[workspacesToPersistCount];
var workspacesToPersistTaskNumber = 0;
foreach (var workspace in workspacesToPersist)
{
    workspacesToPersistTasks[workspacesToPersistTaskNumber] =
                    context.CallPersistWorkspaceActivity(workspace);
    workspacesToPersistTaskNumber++;
}
await Task.WhenAll(workspacesToPersistTasks);

The first sub-orchestrator just fires an Activity that simulates the creating of a very long list of items

    [FunctionName(Constants.AzureFunctions.Orchestrators.GetWorkspacesToSyncSubOrchestrator)]
    public async Task<IEnumerable<SyncWorkspaceDto>> RunOrchestrator(
        [OrchestrationTrigger] IDurableOrchestrationContext context,
        ILogger log)
    {
        // Just calls an Activity that simulates the creating of a very long List (see Activity for more info)
        var workspacesToSync = await context.CallGetGroupWorkspacesFromGraphActivity();

        return workspacesToSync;
    }
    [FunctionName(Constants.AzureFunctions.Activities.Synchronisation.GetGroupWorkspacesFromGraphActivity.Name)]
    public async Task<IEnumerable<SyncWorkspaceDto>> Run(
        [ActivityTrigger] IDurableActivityContext activityContext,
        ILogger log)
    {
        var totalWorkspacesToProcessCount = int.Parse(Environment.GetEnvironmentVariable("NumberOfItemsToProcess") ?? "3");
        log.LogWarning($"GetGroupWorkspacesFromGraphActivity_ItemsToProcess: {totalWorkspacesToProcessCount}. InstanceId: {activityContext.InstanceId}");

        var delay = int.Parse(Environment.GetEnvironmentVariable("GetGroupWorkspacesFromGraphActivityDelayInMiliseconds") ?? "10");
        await Task.Delay(delay);
        log.LogWarning($"GetGroupWorkspacesFromGraphActivityDelay: {delay}");

        var highVolumeList = new List<SyncWorkspaceDto>();

        // Simulate calling an external API and return a High volume list of data
        // The issue happens with about 25k items. Tests were done with 30k.
        // (we have other production environments with about 10k items and are working fine)
        for (var i = 0; i < totalWorkspacesToProcessCount; i++)
        {
            highVolumeList.Add(SyncWorkspaceDto.NewFakeWorkspaceDto());
        }

        return highVolumeList;
    }

Once the very long list of items is returned to the Main orchestrator, a second sub orchestrator is started. This sub-orchestrator does a couple of foreach iterating all the items in the very long list and calling another activity that simulates calling an external API to get more information about the Item.

    [FunctionName(Constants.AzureFunctions.Orchestrators.GetWorkspacesDetailedInfoSubOrchestrator)]
    public async Task<IEnumerable<SyncWorkspaceDto>> RunOrchestrator(
        [OrchestrationTrigger] IDurableOrchestrationContext context,
        ILogger log)
    {
        var workspacesToGetDetailsFor = context.GetInput<IEnumerable<SyncWorkspaceDto>>();

        var workspacesToUpdateFromGraphCount = workspacesToGetDetailsFor.Count();
        var workspacesToUpdateFromGraphTasks = new Task<SyncWorkspaceDto>[workspacesToUpdateFromGraphCount];
        var workspacesToUpdateFromGraphTaskNumber = 0;

        foreach (var workspace in workspacesToGetDetailsFor)
        {
            workspacesToUpdateFromGraphTasks[workspacesToUpdateFromGraphTaskNumber] =
                            context.CallGetWorkspaceDetailedInfoFromGraphActivity(workspace);
            workspacesToUpdateFromGraphTaskNumber++;
        }
        await Task.WhenAll(workspacesToUpdateFromGraphTasks);
        var workspacesToUpdateWithDetailedInfo =
            workspacesToUpdateFromGraphTasks.Select(t => t.Result).Where(workspace => workspace != null);

        // repeat the fan out-in. In real project, we first apply a filter over "workspacesToUpdateWithDetailedInfo"
        // that filter returns most of the items in "workspacesToGetDetailsFor", but not all of them
        // that´s why we need a 2nd fan out-in to complete the filtered list items with info from another data provider
        var workspacesToUpdateFromSharePointCount = workspacesToUpdateWithDetailedInfo.Count();
        var workspacesToUpdateFromSharePointTasks = new Task<SyncWorkspaceDto>[workspacesToUpdateFromSharePointCount];
        var workspacesToUpdateFromSharePointTaskNumber = 0;

        foreach (var workspace in workspacesToUpdateWithDetailedInfo)
        {
            workspacesToUpdateFromSharePointTasks[workspacesToUpdateFromSharePointTaskNumber] =
                            context.CallGetWorkspaceDetailedInfoFromSharePointActivity(workspace);
            workspacesToUpdateFromSharePointTaskNumber++;
        }
        await Task.WhenAll(workspacesToUpdateFromSharePointTasks);

        var workspacesWithFullInfo =
            workspacesToUpdateFromSharePointTasks.Select(t => t.Result).Where(workspace => workspace != null);


        return workspacesWithFullInfo;
    }

According to my tests, it gets stuck in the first fan-out/in foreach. It iterates all the items and the Activity is called and works, but then, the secord foreach is not hit, and the sub-orchestrator and main orchestrator, are running forever.

This is my host.json

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingExcludedTypes": "Request",
      "samplingSettings": {
        "isEnabled": true
      },
      "httpAutoCollectionOptions": {
        "enableW3CDistributedTracing": true
      }
    },
    "logLevel": {
      "default": "Warning",
      "Function": "Information",
      "Microsoft": "Warning",
      "Host.Aggregator": "Warning",
      "Host.Results": "Error"
    }
  },
  "functionTimeout": "00:59:00",
  "extensions": {
    "queues": {
      "maxPollingInterval": "00:00:02",
      "visibilityTimeout": "00:00:30",
      "batchSize": 16,
      "maxDequeueCount": 5,
      "newBatchThreshold": 8
    },
    "durableTask": {
      "hubName": "WorkspacesSynchronisation",
      "tracing": {
        "distributedTracingEnabled": true,
        "distributedTracingProtocol": "W3CTraceContext"
      },
      "storageProvider": {
        "connectionStringName": "DurableStorageUsageConnectionString"
      }
    }
  }
}

Known workarounds

App Details

  <PropertyGroup>
    <TargetFramework>netcoreapp3.1</TargetFramework>
    <AzureFunctionsVersion>v3</AzureFunctionsVersion>
  </PropertyGroup>
  <ItemGroup>
    <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.5.0" />
    <PackageReference Include="Microsoft.NET.Sdk.Functions" Version="3.0.11" />
  </ItemGroup>
  • Durable Functions extension version (e.g. v1.8.3): 2.5.0
  • Azure Functions runtime version (1.0 or 2.0): v3
  • Programming language used: c#

Screenshots

storage-table-instances

The memory and CPU usage during the test:

service-plan-metrics

If deployed to Azure

  • Timeframe issue observed: 2021-11-21T19:52:41.129Z - 2021-11-21T23:20:54.397Z
  • Function App name: https://durablehighvolumedata.azurewebsites.net
  • Function name(s): WorkspacesSynchronisationOrchestrator, GetWorkspacesDetailedInfoSubOrchestrator, GetGroupWorkspacesFromGraphActivity
  • Azure region: West Europe
  • Orchestration instance ID(s): 88ECDF24-9D6B-495F-B10B-87632AA0577C
  • Azure storage account name: durablehighvolumedata

THANK YOU!

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 1
  • Comments: 16

Most upvoted comments

Thanks Chris. We have finally managed to workaround-ed the problem by reworking the sub-orchestrators structure and we also set the host.json settings back to its defaults (no extended session nor concurrency limits).

We based the changes on the idea of avoiding a sub-orchestrator to have to handle an output greater than 1,000 activities. Besides, we also limit the concurrency in code to finish the processing of those 1,000 activities before queueing another batch.

The new structure is made of a main orchestrator, an intermediate sub-orchestrator and a sub-sub-orchestrator that makes calls to the activities. The new structure looks like this:

  • Main orchestrator
    • Loop of calls to “Intermediate sub-orchestrator”, sending 330 elements each time (330 because we call 3 activities inside the orchestraror, and we want to avoid more than 1000 activities).
      • var batches = totalItems in chunks of 330 items
      • foreach (var batch in batches) { await context.IntermediateSubOrchestrator(batch) }
  • Intermediate sub-orchestrator
    • Fan in/out of “Sub-orchestrator” (concurrent). 990 (330 x 3) activities will be run in each sub-orchestrator concurrently.
      • foreach (var item in batch) { itemTasks.Add(context.SubOrchestrator(item)) }
      • await Task.WhenAll (itemTasks)
  • Sub-Sub-orchestrator
    • await Activity 1
    • await Activity 2
    • await Activity 3

Having this new structure we achieved the code to be running without any interrruptions for more than 3 days. Besides, no exceptions have been detected either (we didn’t have CPU and memory issues).

Please, could you have a thought and let me know if your foresee any potential issue with this workaround ??

Also a couple of suggestions after all this journey:

  • I´d love to have a “real-world” sample from MS on processing High-volume data. I think my scenario is pretty common in large organizations moving data between systems, so you have a process that returns a huge list of data, and then you want to process each item as fast as possible. I´ve seen this sample https://docs.microsoft.com/en-us/samples/azure-samples/durablefunctions-mapreduce-dotnet/big-data-processing-serverless-mapreduce-on-azure/ … but this is not the same. It just processes 12 csv files, each csv file has thousands of rows, but all of them are processed in just one Activity.
  • As we saw in my sample. The problem is Huge volume data + big input/output objects. It´d be great to have some thresholds. I know it depends on the VM, but I guess you could provide some thresholds (i.e: the serialized object cannot be bigger than 1Mb…)

Many thanks for the final thoughts @cgillum. Makes sense, and I can´t agree more. Let me know if I can help you and @anthonychu with whatever (early tester 😃). Closing the issue, and looking forward to hear from the team about improvements and new samples. Best!

Please, could you have a thought and let me know if your foresee any potential issue with this workaround ??

I don’t foresee any issues with this workaround and I’m happy to hear that it seems to be working well so far (though I wish it weren’t necessary to employ such workarounds - though that’s something we need to find ways to improve on our side).

Feedback taken - we’re aware that we need to produce more “real-world” samples, especially for workloads dealing with large batches of data. /cc @anthonychu.

Rather than introducing new thresholds, we’re aiming to make scenarios “just-work” more reliably by introducing more memory efficiency, throttling, etc. so that folks don’t have to worry as much about the size of their objects. There’s technically no reason for us to introduce a threshold since we don’t have any limits that force us to. Rather, the trick is ensuring that the system behaves in an expected way when objects do become too large (that said, we can certainly add some concrete recommendations in the docs).

Looking at the callstack for the OutOfMemoryException, my guess is that some function input or output is so large that it’s pushing us over the edge in terms of memory. Without digging further into that, the only other thing I can suggest is to try reducing controlerQueueBufferThreshold and controlQueueBatchSize both all the way down to 1. Otherwise, I think you may need to find a way to use smaller payloads.

As for the “Non-Deterministic workflow detected” error, this is possibly from an older orchestration instance that was created using an earlier version of your code which failed when the new version was deployed?

About 1st point, unless I did something wrong, the Storage is only used by my test app.

I took a second look and you’re right. These storage accounts are not being shared. I think the logic which checks for this saw the same instance ID being used across all apps and made incorrect conclusions based on this. I’ll look into having that detection updated.

I´ve tried the “sub-sub-orchestrator” approach 😃, but it didn´t work either.

I can confirm that this at least fixed the long history load problems which were being reported earlier, so if nothing else there was at least some improvement. 😃

At this point it might make sense for us to focus on the OutOfMemory exceptions. Looking at the specific exception messages, they appear to happen when trying to read the large payloads being passed between your functions. Reducing those payload sizes could help, but another thing we can try that doesn’t involve changing your code is reducing some of the default concurrency settings. We document how to do this here. Can you try reducing orchestration and activity concurrency numbers in host.json to see if that helps? Specifically:

  • maxConcurrentActivityFunctions
  • maxConcurrentOrchestratorFunctions
  • controlQueueBufferThreshold

See if setting those to really small numbers helps resolve the out-of-memory issues.