OpenSearch: [BUG] Async shard fetches taking up GBs of memory causing ClusterManager JVM to spike with large no. of shards (> 50k)
Describe the bug GatewayAllocator performs async fetch for unassigned shards to check if any node has shards on its local disk. These async fetches are done per shard. When nodes are dropped in the cluster, it results in unassigned shards and GatewayAllocator performs async fetches when nodes try to join back. In a cluster which has large no. of shards in 10s of thousands it results in too many async fetches causing cluster manager JVM to spike.
In one of the cluster which had > 50K shards, async fetches was taking 13GB memory.

Expected behavior These async fetches should be batched per node instead of being executed per shard per node.
OpenSearch Version : 1.1
Additional context
Also, these async fetches were discussed earlier in the context of taking up lot of CPU while sending these requests during reroute operation. https://github.com/elastic/elasticsearch/issues/57498 It should also be evaluated if these requests should be sent from different threadpool instead of blocking the masterService#updateTask which is single threaded and needed for processing cluster state updates faster.
About this issue
- Original URL
- State: open
- Created 2 years ago
- Comments: 24 (22 by maintainers)
One suggestion you can run a loop on existing allocateUnassigned method from current interface in loop as the default implementation for the new batch method in
ExistingShardsAllocator. That ways you can move to batch method without breaking any clients.Thanks @amkhar , the details go way beyond my knowledge but batching per node looks very logical
Overall Proposal
Context
During node-left and node-join we do a reroute for assigning unassigned shards. To do this assignment Cluster Manager fetches the shards metadata in async manner on per shard basis. This metadata contains the information about the shard and its allocation. We need this to take a new decision on allocation. reroute is the crucial method call of AllocationService which takes care of this flow for whole cluster.
public ClusterState reroute(ClusterState clusterState, String reason)Once we receive the data from all the nodes (all nodes relevant for the shardId), we build allocation decision and perform reroute. [I’ll explain code flow in detail upcoming section]
Problem
In case there are 50K/100K shards, during cluster restart scenarios, we end up making transport calls for each and every shard. And all transport threads are doing this same work(same code flow for different shardId) which chokes the transport and it make the cluster unstable. OOM kill is one of the results of this situation as there are back to back calls. Spike in JVM is seen due to so many objects of DiscoveryNode class, which are coming in response of the transport calls made to the data nodes. As this is a base class in transport call response class, it’s always present for each shard call’s response.
Acronyms : GA - GatwayAllocator PSA - PrimaryShardAllocator RSA - ReplicaShardAllocator IPSA - InternalPrimaryShardAllocator
Current flow
Let’s understand the code flow a bit better.
Node left scenario
Coordinator.Java has a method
removeNodewhich is the handler for any node failure. It submits a task for node-left with executor type NodeRemovalClusterStateTaskExecutorhttps://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java#L318-L320
execute method of this particular executor triggers AllocationService
https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/cluster/coordination/NodeRemovalClusterStateTaskExecutor.java#L123
Above method unassign shards that are associated with a node which is not part of the cluster. Executor always pass reroute flag as true.
This method first fail all the shards present on bad nodes, then triggers reroute method of AllocationService.
https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java#L317-L318
reroute calls internal private reroute method which triggers the allocation of existing unassigned shards.
https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java#L539
https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java#L544-L570
Now for each primary and replica shard, allocateUnassigned method of GatewayAllocator class is called which triggers
https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java#L168
innerAllocate calls allocateUnassigned method of BaseGatewayShardAllocator which tries to make allocation decision:
https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java#L76
makeAllocation decision is written separately for primary and replica in PSA & RSA respectively. PSA calls fetchData method of IPSA(written in GA file) to find the metadata about this shard from data nodes using transport call:
https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java#L114
GA maintains a map where key is shardId and value is a fetcher object, this fetcher object is used for actually making the transport call and fetch the data.
https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java#L284-L297
fetchData method is written in AsyncFetchShard class, as this is an async method, we intelligently triggers reroute in the async response of this method. Once data is returned, reroute will trigger the flow but won’t call data nodes again as it’ll see that required data is already received in the cache. Because we keep looking in the cache and check which are remaining for fetching using following call within fetchData method:
https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java#L139
Based on the result of this data - AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState We build the result of which nodes we can allocate this shard to and mark allocation decision as YES. Based on this BaseGatewayShardAllocator’s allocataeUnassigned method initializes the shard.
https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java#L83-L91
Node join scenario
JoinTaskExecutor has the code for the actual Task which gets executed on node-join, it triggers a reroute as part of its flow in case cluster manager is changed or a new node joins. In case cluster state is updated with existing nodes, as part of applying the cluster state IndicesClusterStateService triggers shard action - shart/started or shard/failure. Both started and failed executors(ShardStateAction class) have a handler in case cluster state is published from cluster manager and triggers reroute:
https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java#L206-L209
Shard Started
https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java#L829-L836
Shard Failed
https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java#L546-L559
So we know ultimately it’ll trigger the whole flow of assigning unassigned shards.
Solution
Key bottleneck we found by profiling is too many calls of the same fetchData transport(too many serialization/deserialization) as well as the in memory load of too many DiscoveryNode objects once response is received and until it is garbage collected. *More analysis of heap dump needs to be done for understanding more data points on memory side. *
Phase 1: Batch calls for multiple shards
First solution is to batch call for multiple shards which is being done 1 per shard today. This will contain different request type as we take multiple data structures similar to what we send today for 1 shard. Response structure will also change accordingly. This batching can be done on node level or with a fix batch count depending on performance or failure scenarios. A draft PR has been opened already https://github.com/opensearch-project/OpenSearch/pull/7269 , this has rough code for making the call on node level. As we know today 1K shards can be present on a node, we can discuss further if that number is okay or we need configurable batching number to give more resiliency to the solution. Based on this discussion, the solution will change a little bit as currently I’m suggesting changes only for per node level call.
Phase 2 : Reduce Discovery Node objects
We must reduce DiscoveryNodeObjects as they are being created via new when we receive the response as part of code :
Code inside super
Part 1:
We reduce the number of these objects based on the number of calls we make - for example we make a batch of 100 and send 1000 calls. So, only 1000 DiscoveryNode objects will be present depending on for which node we’re sending the call. This way we reduce DN objects as well as save on ser/desr work. We also need to keep a version check(>3.0) for destination node so BWC is maintained(as we know older version nodes won’t have this handler).
Changes required
We need to change the flow starting from AllocationService to all the way down to the transport call (fetchData).
Surrounding changes :
ExistingShardsAllocator, because today it has allocateUnassigned and that is tightly written for per shard.Core changes required for new transport call :
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> shardStores = fetchData(unassignedShard, allocation);will change to a different type like
Changes for failure handling or correctness
https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java#L261-L270
Part 2:
We keep the DiscoveryNode objects limited to the number of nodes exactly. As we know only fixed number of nodes should exist or join/drop the cluster, we keep some common cache for all this particular information and refer it(instead of doing new) whenever needed based on ephemeralId or some other relevant id. This way we can save on memory. As first solution itself is giving some relief in number of DiscoveryNode objects, we can debate more on if this is truly required based on gains diff of part 1 and part 2.
Impact
A benchmarking was run to see that we can scale well with batching approach.
Cluster can scale up to 100-120K shards without facing any OOM kill where current code faces OOM kill with 45K shards. [more details on benchmarking will be attached below]
Note : Current POC code also contain changes where we don’t rely on BaseNodeResponse class so number of DiscoveryNode objects are reduced to number of calls only and right now that is per node, so part2 of the phase 2 solution is automatically covered. But based on discussion on these approaches, I’ll write actual code
Pending Items:
Note 2: Integration tests are failing, need to look at those.
AI: Also need to explore if we need to change the GA async fetch call to generic threadpool instead of using clusterManager updateTask