Skip to content

Commit

Permalink
Optimized ClusterStatsIndices to precomute shard stats (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#14426)

* Optimize Cluster Stats Indices to precomute node level stats

Signed-off-by: Pranshu Shukla <[email protected]>
  • Loading branch information
Pranshu-S authored Jul 23, 2024
1 parent f85a58f commit 5026af6
Show file tree
Hide file tree
Showing 9 changed files with 584 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add persian_stem filter (([#14847](https://github.com/opensearch-project/OpenSearch/pull/14847)))
- Create listener to refresh search thread resource usage ([#14832](https://github.com/opensearch-project/OpenSearch/pull/14832))
- Add rest, transport layer changes for hot to warm tiering - dedicated setup (([#13980](https://github.com/opensearch-project/OpenSearch/pull/13980))
- Optimize Cluster Stats Indices to precomute node level stats ([#14426](https://github.com/opensearch-project/OpenSearch/pull/14426))

### Dependencies
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ public void testNodeCounts() {
Map<String, Integer> expectedCounts = getExpectedCounts(1, 1, 1, 1, 1, 0, 0);
int numNodes = randomIntBetween(1, 5);

ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(response.getNodesStats().getCounts(), total, expectedCounts);

for (int i = 0; i < numNodes; i++) {
Expand Down Expand Up @@ -153,7 +157,11 @@ public void testNodeCountsWithDeprecatedMasterRole() throws ExecutionException,
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 0, 0, 0);

Client client = client();
ClusterStatsResponse response = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client.admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(response.getNodesStats().getCounts(), total, expectedCounts);

Set<String> expectedRoles = Set.of(DiscoveryNodeRole.MASTER_ROLE.roleName());
Expand All @@ -176,15 +184,60 @@ private void assertShardStats(ClusterStatsIndices.ShardStats stats, int indices,
assertThat(stats.getReplication(), Matchers.equalTo(replicationFactor));
}

public void testIndicesShardStats() throws ExecutionException, InterruptedException {
public void testIndicesShardStatsWithoutNodeLevelAggregations() {
internalCluster().startNode();
ensureGreen();
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(false).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));

prepareCreate("test1").setSettings(Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 1)).get();

response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(false).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.YELLOW));
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(0L));
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(1));
assertShardStats(response.getIndicesStats().getShards(), 1, 2, 2, 0.0);

// add another node, replicas should get assigned
internalCluster().startNode();
ensureGreen();
index("test1", "type", "1", "f", "f");
refresh(); // make the doc visible
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(false).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(1L));
assertShardStats(response.getIndicesStats().getShards(), 1, 4, 2, 1.0);

prepareCreate("test2").setSettings(Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0)).get();
ensureGreen();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(false).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(2));
assertShardStats(response.getIndicesStats().getShards(), 2, 7, 5, 2.0 / 5);

assertThat(response.getIndicesStats().getShards().getAvgIndexPrimaryShards(), Matchers.equalTo(2.5));
assertThat(response.getIndicesStats().getShards().getMinIndexPrimaryShards(), Matchers.equalTo(2));
assertThat(response.getIndicesStats().getShards().getMaxIndexPrimaryShards(), Matchers.equalTo(3));

assertThat(response.getIndicesStats().getShards().getAvgIndexShards(), Matchers.equalTo(3.5));
assertThat(response.getIndicesStats().getShards().getMinIndexShards(), Matchers.equalTo(3));
assertThat(response.getIndicesStats().getShards().getMaxIndexShards(), Matchers.equalTo(4));

assertThat(response.getIndicesStats().getShards().getAvgIndexReplication(), Matchers.equalTo(0.5));
assertThat(response.getIndicesStats().getShards().getMinIndexReplication(), Matchers.equalTo(0.0));
assertThat(response.getIndicesStats().getShards().getMaxIndexReplication(), Matchers.equalTo(1.0));

}

public void testIndicesShardStatsWithNodeLevelAggregations() {
internalCluster().startNode();
ensureGreen();
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(true).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));

prepareCreate("test1").setSettings(Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 1)).get();

response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(true).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.YELLOW));
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(0L));
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(1));
Expand All @@ -195,14 +248,14 @@ public void testIndicesShardStats() throws ExecutionException, InterruptedExcept
ensureGreen();
index("test1", "type", "1", "f", "f");
refresh(); // make the doc visible
response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(true).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(1L));
assertShardStats(response.getIndicesStats().getShards(), 1, 4, 2, 1.0);

prepareCreate("test2").setSettings(Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0)).get();
ensureGreen();
response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(true).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(2));
assertShardStats(response.getIndicesStats().getShards(), 2, 7, 5, 2.0 / 5);
Expand All @@ -225,7 +278,11 @@ public void testValuesSmokeScreen() throws IOException, ExecutionException, Inte
internalCluster().startNodes(randomIntBetween(1, 3));
index("test1", "type", "1", "f", "f");

ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
String msg = response.toString();
assertThat(msg, response.getTimestamp(), Matchers.greaterThan(946681200000L)); // 1 Jan 2000
assertThat(msg, response.indicesStats.getStore().getSizeInBytes(), Matchers.greaterThan(0L));
Expand Down Expand Up @@ -265,13 +322,21 @@ public void testAllocatedProcessors() throws Exception {
internalCluster().startNode(Settings.builder().put(OpenSearchExecutors.NODE_PROCESSORS_SETTING.getKey(), 7).build());
waitForNodes(1);

ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertThat(response.getNodesStats().getOs().getAllocatedProcessors(), equalTo(7));
}

public void testClusterStatusWhenStateNotRecovered() throws Exception {
internalCluster().startClusterManagerOnlyNode(Settings.builder().put("gateway.recover_after_nodes", 2).build());
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));

if (randomBoolean()) {
Expand All @@ -281,14 +346,18 @@ public void testClusterStatusWhenStateNotRecovered() throws Exception {
}
// wait for the cluster status to settle
ensureGreen();
response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(randomBoolean()).get();
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.GREEN));
}

public void testFieldTypes() {
internalCluster().startNode();
ensureGreen();
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertTrue(response.getIndicesStats().getMappings().getFieldTypeStats().isEmpty());

Expand All @@ -301,7 +370,7 @@ public void testFieldTypes() {
+ "\"eggplant\":{\"type\":\"integer\"}}}}}"
)
.get();
response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(randomBoolean()).get();
assertThat(response.getIndicesStats().getMappings().getFieldTypeStats().size(), equalTo(3));
Set<IndexFeatureStats> stats = response.getIndicesStats().getMappings().getFieldTypeStats();
for (IndexFeatureStats stat : stats) {
Expand Down Expand Up @@ -329,7 +398,11 @@ public void testNodeRolesWithMasterLegacySettings() throws ExecutionException, I
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 1, 0, 0);

Client client = client();
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse clusterStatsResponse = client.admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedCounts);

Set<String> expectedRoles = Set.of(
Expand Down Expand Up @@ -359,7 +432,11 @@ public void testNodeRolesWithClusterManagerRole() throws ExecutionException, Int
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 1, 0, 0);

Client client = client();
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse clusterStatsResponse = client.admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedCounts);

Set<String> expectedRoles = Set.of(
Expand All @@ -383,7 +460,11 @@ public void testNodeRolesWithSeedDataNodeLegacySettings() throws ExecutionExcept
Map<String, Integer> expectedRoleCounts = getExpectedCounts(1, 1, 1, 0, 1, 0, 0);

Client client = client();
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse clusterStatsResponse = client.admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedRoleCounts);

Set<String> expectedRoles = Set.of(
Expand All @@ -410,7 +491,11 @@ public void testNodeRolesWithDataNodeLegacySettings() throws ExecutionException,
Map<String, Integer> expectedRoleCounts = getExpectedCounts(1, 1, 1, 0, 1, 0, 0);

Client client = client();
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse clusterStatsResponse = client.admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedRoleCounts);

Set<Set<String>> expectedNodesRoles = Set.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,49 @@ public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, Mapping
this.segments = new SegmentsStats();

for (ClusterStatsNodeResponse r : nodeResponses) {
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
if (indexShardStats == null) {
indexShardStats = new ShardStats();
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
// Aggregated response from the node
if (r.getAggregatedNodeLevelStats() != null) {

for (Map.Entry<String, ClusterStatsNodeResponse.AggregatedIndexStats> entry : r.getAggregatedNodeLevelStats().indexStatsMap
.entrySet()) {
ShardStats indexShardStats = countsPerIndex.get(entry.getKey());
if (indexShardStats == null) {
indexShardStats = new ShardStats(entry.getValue());
countsPerIndex.put(entry.getKey(), indexShardStats);
} else {
indexShardStats.addStatsFrom(entry.getValue());
}
}

indexShardStats.total++;

CommonStats shardCommonStats = shardStats.getStats();

if (shardStats.getShardRouting().primary()) {
indexShardStats.primaries++;
docs.add(shardCommonStats.docs);
docs.add(r.getAggregatedNodeLevelStats().commonStats.docs);
store.add(r.getAggregatedNodeLevelStats().commonStats.store);
fieldData.add(r.getAggregatedNodeLevelStats().commonStats.fieldData);
queryCache.add(r.getAggregatedNodeLevelStats().commonStats.queryCache);
completion.add(r.getAggregatedNodeLevelStats().commonStats.completion);
segments.add(r.getAggregatedNodeLevelStats().commonStats.segments);
} else {
// Default response from the node
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
if (indexShardStats == null) {
indexShardStats = new ShardStats();
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
}

indexShardStats.total++;

CommonStats shardCommonStats = shardStats.getStats();

if (shardStats.getShardRouting().primary()) {
indexShardStats.primaries++;
docs.add(shardCommonStats.docs);
}
store.add(shardCommonStats.store);
fieldData.add(shardCommonStats.fieldData);
queryCache.add(shardCommonStats.queryCache);
completion.add(shardCommonStats.completion);
segments.add(shardCommonStats.segments);
}
store.add(shardCommonStats.store);
fieldData.add(shardCommonStats.fieldData);
queryCache.add(shardCommonStats.queryCache);
completion.add(shardCommonStats.completion);
segments.add(shardCommonStats.segments);
}
}

Expand Down Expand Up @@ -202,6 +225,11 @@ public static class ShardStats implements ToXContentFragment {

public ShardStats() {}

public ShardStats(ClusterStatsNodeResponse.AggregatedIndexStats aggregatedIndexStats) {
this.total = aggregatedIndexStats.total;
this.primaries = aggregatedIndexStats.primaries;
}

/**
* number of indices in the cluster
*/
Expand Down Expand Up @@ -329,6 +357,11 @@ public void addIndexShardCount(ShardStats indexShardCount) {
}
}

public void addStatsFrom(ClusterStatsNodeResponse.AggregatedIndexStats incomingStats) {
this.total += incomingStats.total;
this.primaries += incomingStats.primaries;
}

/**
* Inner Fields used for creating XContent and parsing
*
Expand Down
Loading

0 comments on commit 5026af6

Please sign in to comment.