OpenSearch: [BUG] Async shard fetches taking up GBs of memory causing ClusterManager JVM to spike with large no. of shards (> 50k)

Describe the bug GatewayAllocator performs async fetch for unassigned shards to check if any node has shards on its local disk. These async fetches are done per shard. When nodes are dropped in the cluster, it results in unassigned shards and GatewayAllocator performs async fetches when nodes try to join back. In a cluster which has large no. of shards in 10s of thousands it results in too many async fetches causing cluster manager JVM to spike.

In one of the cluster which had > 50K shards, async fetches was taking 13GB memory.

Screenshot 2022-09-22 at 3 30 38 PM

Expected behavior These async fetches should be batched per node instead of being executed per shard per node.

OpenSearch Version : 1.1

Additional context Also, these async fetches were discussed earlier in the context of taking up lot of CPU while sending these requests during reroute operation. https://github.com/elastic/elasticsearch/issues/57498 It should also be evaluated if these requests should be sent from different threadpool instead of blocking the masterService#updateTask which is single threaded and needed for processing cluster state updates faster.

About this issue

  • Original URL
  • State: open
  • Created 2 years ago
  • Comments: 24 (22 by maintainers)

Most upvoted comments

For now I only see one implementation and this method is not overridden there. I think if a new ExistingShardsAllocator implementation is provided by new plugins, that need to add extra methods which we’re planning to add now (allocation of shards in a batch per node).

@amkhar There could be custom plugin implementation that customers might be using in their private repos. And, you can’t introduce a breaking change in 2.x where they need to implement a new method in the interface.

One suggestion you can run a loop on existing allocateUnassigned method from current interface in loop as the default implementation for the new batch method in ExistingShardsAllocator. That ways you can move to batch method without breaking any clients.

@reta - would love some feedback.

Thanks @amkhar , the details go way beyond my knowledge but batching per node looks very logical

Overall Proposal

Context

During node-left and node-join we do a reroute for assigning unassigned shards. To do this assignment Cluster Manager fetches the shards metadata in async manner on per shard basis. This metadata contains the information about the shard and its allocation. We need this to take a new decision on allocation. reroute is the crucial method call of AllocationService which takes care of this flow for whole cluster.

public ClusterState reroute(ClusterState clusterState, String reason)

Once we receive the data from all the nodes (all nodes relevant for the shardId), we build allocation decision and perform reroute. [I’ll explain code flow in detail upcoming section]

Problem

In case there are 50K/100K shards, during cluster restart scenarios, we end up making transport calls for each and every shard. And all transport threads are doing this same work(same code flow for different shardId) which chokes the transport and it make the cluster unstable. OOM kill is one of the results of this situation as there are back to back calls. Spike in JVM is seen due to so many objects of DiscoveryNode class, which are coming in response of the transport calls made to the data nodes. As this is a base class in transport call response class, it’s always present for each shard call’s response.

Acronyms : GA - GatwayAllocator PSA - PrimaryShardAllocator RSA - ReplicaShardAllocator IPSA - InternalPrimaryShardAllocator

Current flow

Let’s understand the code flow a bit better.

Node left scenario

Coordinator.Java has a method removeNode which is the handler for any node failure. It submits a task for node-left with executor type NodeRemovalClusterStateTaskExecutor

https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java#L318-L320

execute method of this particular executor triggers AllocationService

https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/cluster/coordination/NodeRemovalClusterStateTaskExecutor.java#L123

Above method unassign shards that are associated with a node which is not part of the cluster. Executor always pass reroute flag as true.

This method first fail all the shards present on bad nodes, then triggers reroute method of AllocationService.

https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java#L317-L318

reroute calls internal private reroute method which triggers the allocation of existing unassigned shards.

https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java#L539

https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java#L544-L570

Now for each primary and replica shard, allocateUnassigned method of GatewayAllocator class is called which triggers

https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java#L168

innerAllocate calls allocateUnassigned method of BaseGatewayShardAllocator which tries to make allocation decision:

https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java#L76

makeAllocation decision is written separately for primary and replica in PSA & RSA respectively. PSA calls fetchData method of IPSA(written in GA file) to find the metadata about this shard from data nodes using transport call:

https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java#L114

GA maintains a map where key is shardId and value is a fetcher object, this fetcher object is used for actually making the transport call and fetch the data.

https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java#L284-L297

fetchData method is written in AsyncFetchShard class, as this is an async method, we intelligently triggers reroute in the async response of this method. Once data is returned, reroute will trigger the flow but won’t call data nodes again as it’ll see that required data is already received in the cache. Because we keep looking in the cache and check which are remaining for fetching using following call within fetchData method:

https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java#L139

Based on the result of this data - AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState We build the result of which nodes we can allocate this shard to and mark allocation decision as YES. Based on this BaseGatewayShardAllocator’s allocataeUnassigned method initializes the shard.

https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java#L83-L91

Node join scenario

JoinTaskExecutor has the code for the actual Task which gets executed on node-join, it triggers a reroute as part of its flow in case cluster manager is changed or a new node joins. In case cluster state is updated with existing nodes, as part of applying the cluster state IndicesClusterStateService triggers shard action - shart/started or shard/failure. Both started and failed executors(ShardStateAction class) have a handler in case cluster state is published from cluster manager and triggers reroute:

https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java#L206-L209

Shard Started

https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java#L829-L836

Shard Failed

https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java#L546-L559

So we know ultimately it’ll trigger the whole flow of assigning unassigned shards.

Solution

Key bottleneck we found by profiling is too many calls of the same fetchData transport(too many serialization/deserialization) as well as the in memory load of too many DiscoveryNode objects once response is received and until it is garbage collected. *More analysis of heap dump needs to be done for understanding more data points on memory side. *

Phase 1: Batch calls for multiple shards

First solution is to batch call for multiple shards which is being done 1 per shard today. This will contain different request type as we take multiple data structures similar to what we send today for 1 shard. Response structure will also change accordingly. This batching can be done on node level or with a fix batch count depending on performance or failure scenarios. A draft PR has been opened already https://github.com/opensearch-project/OpenSearch/pull/7269 , this has rough code for making the call on node level. As we know today 1K shards can be present on a node, we can discuss further if that number is okay or we need configurable batching number to give more resiliency to the solution. Based on this discussion, the solution will change a little bit as currently I’m suggesting changes only for per node level call.

Phase 2 : Reduce Discovery Node objects

We must reduce DiscoveryNodeObjects as they are being created via new when we receive the response as part of code :

public NodeGatewayStartedShards(StreamInput in) throws IOException {
    super(in);
}

Code inside super

protected BaseNodeResponse(StreamInput in) throws IOException {
        super(in);
        **node = new DiscoveryNode(in);**
    }

Part 1:

We reduce the number of these objects based on the number of calls we make - for example we make a batch of 100 and send 1000 calls. So, only 1000 DiscoveryNode objects will be present depending on for which node we’re sending the call. This way we reduce DN objects as well as save on ser/desr work. We also need to keep a version check(>3.0) for destination node so BWC is maintained(as we know older version nodes won’t have this handler).

Changes required

We need to change the flow starting from AllocationService to all the way down to the transport call (fetchData).

Surrounding changes :

  • Need to introduce a new method like allocateUnassignedBatch or allocateUnassignedPerNode in the interface ExistingShardsAllocator , because today it has allocateUnassigned and that is tightly written for per shard.
  • Use the newly written method in allocateExistingUnassignedShards method based on version to avoid BWC issues.
  • Construct a map like <Node, List<Shard>> , iterate this map and call allocateUnassignedPerNode passing one entry of this map.
  • Write a new implementation of allocateUnassignedPerNode in GatewayAllocator which calls the allocateUnassignedPerNode of BaseGatewayShardsAllocator
  • Add a new method similar to makeAllocationDecision, as makeAllocationDecision makes sense for allocations done on specific nodes. As the actual implementation of this method is written in PSA/RSA. They need to write a new implementation and return a List<AllocateUnassignedDecision> [will discuss later if abstracting out the makeAllocationDecision all the way to ExistingShardsAllocator would make sense ??]
  • BaseGatewayShardsAllocator then use these list of decisions to do the actual allocation.

Core changes required for new transport call :

AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> shardStores = fetchData(unassignedShard, allocation);

  • This fetch data needs to be changed for a list of shards not just a single shard so a new method is required in both PSA & RSA with new implementation.
  • As the fetcher objects of GA needs to do the work per node basis the core map of per shard fetcher
private final ConcurrentMap<
        ShardId,
        AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>> asyncFetchStarted = ConcurrentCollections
            .newConcurrentMap();

will change to a different type like

private final ConcurrentMap<
        NodeId,
        AsyncShardBatchFetch<TransportNodesListGatewayStartedShards**Batch**.NodeGatewayStartedShards**Batch**>> nodeLevelShardFetchers = ConcurrentCollections
            .newConcurrentMap();
  • Currently request contains shardId and customDataPath, new request will have a List of <ShardId, CustomDataPath> or map of same in case .get method is required in implementation.
  • Similarly response contain an allocationId in NodeGatewayStartedShards, new response would have the mapping of <shardId, NodeGatewayStartedShards> .
  • Key thing is that the overall response from 1 node will be having 1 DiscoveryNode object in base class BaseNodeResponse, so for many shards (or all shards on a node) there will be reduced number of DiscoveryNode object in response anyways.
  • Implementation of the transport call on node will be similar to current nodeOperation code but it’ll work of a number of shards not just 1.
  • Cache object which contains the information about which node’s fetching is in progress or done or failed, will be flipped to maintain similar information for shardId (in our node level fetcher class’s cache).

Changes for failure handling or correctness

  • We’ll keep getting new set of shards which are unassigned if more nodes go down. For that we need to maintain an atomic set where shards are maintained for which fetch is already happening and async success call will clear it out from the set. This way we always make sure that we can cleanly send a new request for different set of shards if new nodes fail. Before sending a call, we must check and get the diff using that atomic/synchronized set.
  • In case result was failed for a particular shard (as request is being executed for single node it’s hard to see partial failures, but still…), we’ll either restart fetching based on retryable exceptions or permanently fail the fetch as it is done today.

https://github.com/opensearch-project/OpenSearch/blob/90678c2d76ed1c31aebc85a23fe5be12f6e81417/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java#L261-L270

Part 2:

We keep the DiscoveryNode objects limited to the number of nodes exactly. As we know only fixed number of nodes should exist or join/drop the cluster, we keep some common cache for all this particular information and refer it(instead of doing new) whenever needed based on ephemeralId or some other relevant id. This way we can save on memory. As first solution itself is giving some relief in number of DiscoveryNode objects, we can debate more on if this is truly required based on gains diff of part 1 and part 2.

Impact

A benchmarking was run to see that we can scale well with batching approach.

Cluster can scale up to 100-120K shards without facing any OOM kill where current code faces OOM kill with 45K shards. [more details on benchmarking will be attached below]

Note : Current POC code also contain changes where we don’t rely on BaseNodeResponse class so number of DiscoveryNode objects are reduced to number of calls only and right now that is per node, so part2 of the phase 2 solution is automatically covered. But based on discussion on these approaches, I’ll write actual code

Pending Items:

Note 2: Integration tests are failing, need to look at those.

AI: Also need to explore if we need to change the GA async fetch call to generic threadpool instead of using clusterManager updateTask