Skip to content

Commit 95e2825

Browse files
committed
Optimized ClusterStatsIndices to precomute shard stats
Signed-off-by: Pranshu Shukla <[email protected]>
1 parent b4692c8 commit 95e2825

File tree

6 files changed

+255
-32
lines changed

6 files changed

+255
-32
lines changed

server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIT.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ public void testNodeCounts() {
114114
NodeRoleSettings.NODE_ROLES_SETTING.getKey(),
115115
roles.stream().map(DiscoveryNodeRole::roleName).collect(Collectors.toList())
116116
)
117+
.put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean())
117118
.build();
118119
internalCluster().startNode(settings);
119120
total++;
@@ -146,6 +147,7 @@ public void testNodeCountsWithDeprecatedMasterRole() throws ExecutionException,
146147
int total = 1;
147148
Settings settings = Settings.builder()
148149
.putList(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), Collections.singletonList(DiscoveryNodeRole.MASTER_ROLE.roleName()))
150+
.put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean())
149151
.build();
150152
internalCluster().startNode(settings);
151153
waitForNodes(total);
@@ -177,7 +179,8 @@ private void assertShardStats(ClusterStatsIndices.ShardStats stats, int indices,
177179
}
178180

179181
public void testIndicesShardStats() throws ExecutionException, InterruptedException {
180-
internalCluster().startNode();
182+
Settings settings = Settings.builder().put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean()).build();
183+
internalCluster().startNode(settings);
181184
ensureGreen();
182185
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
183186
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
@@ -222,7 +225,8 @@ public void testIndicesShardStats() throws ExecutionException, InterruptedExcept
222225
}
223226

224227
public void testValuesSmokeScreen() throws IOException, ExecutionException, InterruptedException {
225-
internalCluster().startNodes(randomIntBetween(1, 3));
228+
Settings settings = Settings.builder().put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean()).build();
229+
internalCluster().startNodes(randomIntBetween(1, 3), settings);
226230
index("test1", "type", "1", "f", "f");
227231

228232
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
@@ -262,15 +266,25 @@ public void testValuesSmokeScreen() throws IOException, ExecutionException, Inte
262266

263267
public void testAllocatedProcessors() throws Exception {
264268
// start one node with 7 processors.
265-
internalCluster().startNode(Settings.builder().put(OpenSearchExecutors.NODE_PROCESSORS_SETTING.getKey(), 7).build());
269+
internalCluster().startNode(
270+
Settings.builder()
271+
.put(OpenSearchExecutors.NODE_PROCESSORS_SETTING.getKey(), 7)
272+
.put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean())
273+
.build()
274+
);
266275
waitForNodes(1);
267276

268277
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
269278
assertThat(response.getNodesStats().getOs().getAllocatedProcessors(), equalTo(7));
270279
}
271280

272281
public void testClusterStatusWhenStateNotRecovered() throws Exception {
273-
internalCluster().startClusterManagerOnlyNode(Settings.builder().put("gateway.recover_after_nodes", 2).build());
282+
internalCluster().startClusterManagerOnlyNode(
283+
Settings.builder()
284+
.put("gateway.recover_after_nodes", 2)
285+
.put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean())
286+
.build()
287+
);
274288
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
275289
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));
276290

@@ -286,7 +300,8 @@ public void testClusterStatusWhenStateNotRecovered() throws Exception {
286300
}
287301

288302
public void testFieldTypes() {
289-
internalCluster().startNode();
303+
Settings settings = Settings.builder().put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean()).build();
304+
internalCluster().startNode(settings);
290305
ensureGreen();
291306
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
292307
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
@@ -321,6 +336,7 @@ public void testNodeRolesWithMasterLegacySettings() throws ExecutionException, I
321336
.put("node.master", true)
322337
.put("node.data", false)
323338
.put("node.ingest", false)
339+
.put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean())
324340
.build();
325341

326342
internalCluster().startNodes(legacyMasterSettings);
@@ -351,6 +367,7 @@ public void testNodeRolesWithClusterManagerRole() throws ExecutionException, Int
351367
DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName()
352368
)
353369
)
370+
.put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean())
354371
.build();
355372

356373
internalCluster().startNodes(clusterManagerNodeRoleSettings);
@@ -375,6 +392,7 @@ public void testNodeRolesWithSeedDataNodeLegacySettings() throws ExecutionExcept
375392
.put("node.master", true)
376393
.put("node.data", true)
377394
.put("node.ingest", false)
395+
.put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean())
378396
.build();
379397

380398
internalCluster().startNodes(legacySeedDataNodeSettings);
@@ -400,6 +418,7 @@ public void testNodeRolesWithDataNodeLegacySettings() throws ExecutionException,
400418
.put("node.master", false)
401419
.put("node.data", true)
402420
.put("node.ingest", false)
421+
.put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean())
403422
.build();
404423

405424
// can't start data-only node without assigning cluster-manager

server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434

3535
import org.opensearch.action.admin.indices.stats.CommonStats;
3636
import org.opensearch.common.annotation.PublicApi;
37+
import org.opensearch.core.common.io.stream.StreamInput;
38+
import org.opensearch.core.common.io.stream.StreamOutput;
39+
import org.opensearch.core.common.io.stream.Writeable;
3740
import org.opensearch.core.xcontent.ToXContentFragment;
3841
import org.opensearch.core.xcontent.XContentBuilder;
3942
import org.opensearch.index.cache.query.QueryCacheStats;
@@ -55,7 +58,6 @@
5558
*/
5659
@PublicApi(since = "1.0.0")
5760
public class ClusterStatsIndices implements ToXContentFragment {
58-
5961
private int indexCount;
6062
private ShardStats shards;
6163
private DocsStats docs;
@@ -78,26 +80,42 @@ public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, Mapping
7880
this.segments = new SegmentsStats();
7981

8082
for (ClusterStatsNodeResponse r : nodeResponses) {
81-
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
82-
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
83-
if (indexShardStats == null) {
84-
indexShardStats = new ShardStats();
85-
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
86-
}
87-
88-
indexShardStats.total++;
89-
90-
CommonStats shardCommonStats = shardStats.getStats();
91-
92-
if (shardStats.getShardRouting().primary()) {
93-
indexShardStats.primaries++;
94-
docs.add(shardCommonStats.docs);
83+
if (r.getNodeIndexShardStats() != null) {
84+
r.getNodeIndexShardStats().indexStatsMap.forEach(
85+
(index, indexCountStats) -> countsPerIndex.merge(index, indexCountStats, (v1, v2) -> {
86+
v1.addStatsFrom(v2);
87+
return v1;
88+
})
89+
);
90+
91+
docs.add(r.getNodeIndexShardStats().docs);
92+
store.add(r.getNodeIndexShardStats().store);
93+
fieldData.add(r.getNodeIndexShardStats().fieldData);
94+
queryCache.add(r.getNodeIndexShardStats().queryCache);
95+
completion.add(r.getNodeIndexShardStats().completion);
96+
segments.add(r.getNodeIndexShardStats().segments);
97+
} else {
98+
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
99+
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
100+
if (indexShardStats == null) {
101+
indexShardStats = new ShardStats();
102+
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
103+
}
104+
105+
indexShardStats.total++;
106+
107+
CommonStats shardCommonStats = shardStats.getStats();
108+
109+
if (shardStats.getShardRouting().primary()) {
110+
indexShardStats.primaries++;
111+
docs.add(shardCommonStats.docs);
112+
}
113+
store.add(shardCommonStats.store);
114+
fieldData.add(shardCommonStats.fieldData);
115+
queryCache.add(shardCommonStats.queryCache);
116+
completion.add(shardCommonStats.completion);
117+
segments.add(shardCommonStats.segments);
95118
}
96-
store.add(shardCommonStats.store);
97-
fieldData.add(shardCommonStats.fieldData);
98-
queryCache.add(shardCommonStats.queryCache);
99-
completion.add(shardCommonStats.completion);
100-
segments.add(shardCommonStats.segments);
101119
}
102120
}
103121

@@ -185,7 +203,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
185203
* @opensearch.api
186204
*/
187205
@PublicApi(since = "1.0.0")
188-
public static class ShardStats implements ToXContentFragment {
206+
public static class ShardStats implements ToXContentFragment, Writeable {
189207

190208
int indices;
191209
int total;
@@ -202,6 +220,12 @@ public static class ShardStats implements ToXContentFragment {
202220

203221
public ShardStats() {}
204222

223+
public ShardStats(StreamInput in) throws IOException {
224+
indices = in.readVInt();
225+
total = in.readVInt();
226+
primaries = in.readVInt();
227+
}
228+
205229
/**
206230
* number of indices in the cluster
207231
*/
@@ -329,6 +353,19 @@ public void addIndexShardCount(ShardStats indexShardCount) {
329353
}
330354
}
331355

356+
public void addStatsFrom(ShardStats incomingStats) {
357+
this.total += incomingStats.getTotal();
358+
this.indices += incomingStats.getIndices();
359+
this.primaries += incomingStats.getPrimaries();
360+
}
361+
362+
@Override
363+
public void writeTo(StreamOutput out) throws IOException {
364+
out.writeVInt(indices);
365+
out.writeVInt(total);
366+
out.writeVInt(primaries);
367+
}
368+
332369
/**
333370
* Inner Fields used for creating XContent and parsing
334371
*

server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232

3333
package org.opensearch.action.admin.cluster.stats;
3434

35+
import org.apache.logging.log4j.LogManager;
36+
import org.apache.logging.log4j.Logger;
37+
import org.opensearch.Version;
3538
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
3639
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
3740
import org.opensearch.action.admin.indices.stats.ShardStats;
@@ -50,11 +53,12 @@
5053
* @opensearch.internal
5154
*/
5255
public class ClusterStatsNodeResponse extends BaseNodeResponse {
53-
56+
private static final Logger log = LogManager.getLogger(ClusterStatsNodeResponse.class);
5457
private final NodeInfo nodeInfo;
5558
private final NodeStats nodeStats;
56-
private final ShardStats[] shardsStats;
59+
private ShardStats[] shardsStats;
5760
private ClusterHealthStatus clusterStatus;
61+
private NodeIndexShardStats nodeIndexShardStats;
5862

5963
public ClusterStatsNodeResponse(StreamInput in) throws IOException {
6064
super(in);
@@ -64,7 +68,12 @@ public ClusterStatsNodeResponse(StreamInput in) throws IOException {
6468
}
6569
this.nodeInfo = new NodeInfo(in);
6670
this.nodeStats = new NodeStats(in);
67-
shardsStats = in.readArray(ShardStats::new, ShardStats[]::new);
71+
if (in.getVersion().onOrAfter(Version.V_2_13_0)) {
72+
this.shardsStats = in.readOptionalArray(ShardStats::new, ShardStats[]::new);
73+
this.nodeIndexShardStats = in.readOptionalWriteable(NodeIndexShardStats::new);
74+
} else {
75+
this.shardsStats = in.readArray(ShardStats::new, ShardStats[]::new);
76+
}
6877
}
6978

7079
public ClusterStatsNodeResponse(
@@ -77,8 +86,27 @@ public ClusterStatsNodeResponse(
7786
super(node);
7887
this.nodeInfo = nodeInfo;
7988
this.nodeStats = nodeStats;
89+
this.clusterStatus = clusterStatus;
8090
this.shardsStats = shardsStats;
91+
}
92+
93+
public ClusterStatsNodeResponse(
94+
DiscoveryNode node,
95+
@Nullable ClusterHealthStatus clusterStatus,
96+
NodeInfo nodeInfo,
97+
NodeStats nodeStats,
98+
ShardStats[] shardsStats,
99+
boolean optimized
100+
) {
101+
super(node);
102+
this.nodeInfo = nodeInfo;
103+
this.nodeStats = nodeStats;
81104
this.clusterStatus = clusterStatus;
105+
if (optimized) {
106+
log.info(node.getVersion().toString());
107+
this.nodeIndexShardStats = new NodeIndexShardStats(node, shardsStats);
108+
}
109+
this.shardsStats = shardsStats;
82110
}
83111

84112
public NodeInfo nodeInfo() {
@@ -101,6 +129,10 @@ public ShardStats[] shardsStats() {
101129
return this.shardsStats;
102130
}
103131

132+
public NodeIndexShardStats getNodeIndexShardStats() {
133+
return nodeIndexShardStats;
134+
}
135+
104136
public static ClusterStatsNodeResponse readNodeResponse(StreamInput in) throws IOException {
105137
return new ClusterStatsNodeResponse(in);
106138
}
@@ -116,6 +148,16 @@ public void writeTo(StreamOutput out) throws IOException {
116148
}
117149
nodeInfo.writeTo(out);
118150
nodeStats.writeTo(out);
119-
out.writeArray(shardsStats);
151+
if (out.getVersion().onOrAfter(Version.V_2_13_0)) {
152+
if (nodeIndexShardStats != null) {
153+
out.writeOptionalArray(null);
154+
out.writeOptionalWriteable(nodeIndexShardStats);
155+
} else {
156+
out.writeOptionalArray(shardsStats);
157+
out.writeOptionalWriteable(null);
158+
}
159+
} else {
160+
out.writeArray(shardsStats);
161+
}
120162
}
121163
}

0 commit comments

Comments
 (0)