Skip to content

Commit 19e5625

Browse files
author
Swetha Guptha
committed
Optimise memory for rest cluster health calls with inline shard aggregations for levels cluster and indices.
Signed-off-by: Swetha Guptha <[email protected]>
1 parent 3fc0139 commit 19e5625

15 files changed

+635
-39
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4848
- Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557))
4949
- Optimise snapshot deletion to speed up snapshot deletion and creation ([#15568](https://github.com/opensearch-project/OpenSearch/pull/15568))
5050
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))
51+
- Cluster health API memory optimisation based on health level ([#15492](https://github.com/opensearch-project/OpenSearch/pull/15492))
5152

5253
### Dependencies
5354
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))

server/src/internalClusterTest/java/org/opensearch/cluster/ClusterHealthIT.java

+153
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.opensearch.action.support.IndicesOptions;
3838
import org.opensearch.action.support.PlainActionFuture;
3939
import org.opensearch.cluster.health.ClusterHealthStatus;
40+
import org.opensearch.cluster.health.ClusterIndexHealth;
41+
import org.opensearch.cluster.health.ClusterShardHealth;
4042
import org.opensearch.cluster.metadata.IndexMetadata;
4143
import org.opensearch.cluster.routing.UnassignedInfo;
4244
import org.opensearch.cluster.service.ClusterService;
@@ -49,6 +51,7 @@
4951

5052
import java.util.ArrayList;
5153
import java.util.List;
54+
import java.util.Map;
5255
import java.util.concurrent.atomic.AtomicBoolean;
5356

5457
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
@@ -439,4 +442,154 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
439442
completionFuture.actionGet(TimeValue.timeValueSeconds(30));
440443
}
441444
}
445+
446+
public void testHealthAtLevelParamEqualsClusterIsHonoured() {
447+
createIndex(
448+
"test1",
449+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
450+
);
451+
ensureGreen();
452+
ClusterHealthResponse healthResponse = client().admin()
453+
.cluster()
454+
.prepareHealth()
455+
.setHonourClusterHealthLevel(true)
456+
.execute()
457+
.actionGet();
458+
assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus());
459+
assertTrue(healthResponse.getIndices().isEmpty());
460+
assertEquals(1, healthResponse.getActiveShards());
461+
assertEquals(1, healthResponse.getActivePrimaryShards());
462+
assertEquals(0, healthResponse.getUnassignedShards());
463+
assertEquals(0, healthResponse.getInitializingShards());
464+
assertEquals(0, healthResponse.getRelocatingShards());
465+
assertEquals(0, healthResponse.getDelayedUnassignedShards());
466+
}
467+
468+
public void testHealthAtLevelParamEqualIndicesIsHonoured() {
469+
createIndex(
470+
"test1",
471+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
472+
);
473+
ensureGreen();
474+
ClusterHealthResponse healthResponse = client().admin()
475+
.cluster()
476+
.prepareHealth()
477+
.setLevel("indices")
478+
.setHonourClusterHealthLevel(true)
479+
.execute()
480+
.actionGet();
481+
assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus());
482+
483+
assertEquals(1, healthResponse.getActiveShards());
484+
assertEquals(1, healthResponse.getActivePrimaryShards());
485+
assertEquals(0, healthResponse.getUnassignedShards());
486+
assertEquals(0, healthResponse.getInitializingShards());
487+
assertEquals(0, healthResponse.getRelocatingShards());
488+
assertEquals(0, healthResponse.getDelayedUnassignedShards());
489+
490+
Map<String, ClusterIndexHealth> indices = healthResponse.getIndices();
491+
assertFalse(indices.isEmpty());
492+
assertEquals(1, indices.size());
493+
for (Map.Entry<String, ClusterIndexHealth> indicesHealth : indices.entrySet()) {
494+
String indexName = indicesHealth.getKey();
495+
assertEquals("test1", indexName);
496+
ClusterIndexHealth indicesHealthValue = indicesHealth.getValue();
497+
assertEquals(1, indicesHealthValue.getActiveShards());
498+
assertEquals(1, indicesHealthValue.getActivePrimaryShards());
499+
assertEquals(0, indicesHealthValue.getInitializingShards());
500+
assertEquals(0, indicesHealthValue.getUnassignedShards());
501+
assertEquals(0, indicesHealthValue.getDelayedUnassignedShards());
502+
assertEquals(0, indicesHealthValue.getRelocatingShards());
503+
assertEquals(ClusterHealthStatus.GREEN, indicesHealthValue.getStatus());
504+
assertTrue(indicesHealthValue.getShards().isEmpty());
505+
}
506+
}
507+
508+
public void testHealthAtLevelParamEqualShardsIsHonoured() {
509+
createIndex(
510+
"test1",
511+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
512+
);
513+
ensureGreen(TimeValue.timeValueSeconds(120), "test1");
514+
createIndex(
515+
"test2",
516+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
517+
);
518+
ensureGreen(TimeValue.timeValueSeconds(120));
519+
client().admin()
520+
.indices()
521+
.prepareUpdateSettings()
522+
.setIndices("test2")
523+
.setSettings(Settings.builder().put("index.number_of_replicas", 2).build())
524+
.execute()
525+
.actionGet();
526+
ClusterHealthResponse healthResponse = client().admin()
527+
.cluster()
528+
.prepareHealth()
529+
.setLevel("shards")
530+
.setHonourClusterHealthLevel(true)
531+
.execute()
532+
.actionGet();
533+
assertEquals(ClusterHealthStatus.YELLOW, healthResponse.getStatus());
534+
535+
assertEquals(4, healthResponse.getActiveShards());
536+
assertEquals(2, healthResponse.getActivePrimaryShards());
537+
assertEquals(1, healthResponse.getUnassignedShards());
538+
assertEquals(0, healthResponse.getInitializingShards());
539+
assertEquals(0, healthResponse.getRelocatingShards());
540+
assertEquals(0, healthResponse.getDelayedUnassignedShards());
541+
542+
Map<String, ClusterIndexHealth> indices = healthResponse.getIndices();
543+
assertFalse(indices.isEmpty());
544+
assertEquals(2, indices.size());
545+
for (Map.Entry<String, ClusterIndexHealth> indicesHealth : indices.entrySet()) {
546+
String indexName = indicesHealth.getKey();
547+
boolean indexHasMoreReplicas = indexName.equals("test2");
548+
ClusterIndexHealth indicesHealthValue = indicesHealth.getValue();
549+
assertEquals(2, indicesHealthValue.getActiveShards());
550+
assertEquals(1, indicesHealthValue.getActivePrimaryShards());
551+
assertEquals(0, indicesHealthValue.getInitializingShards());
552+
assertEquals(indexHasMoreReplicas ? 1 : 0, indicesHealthValue.getUnassignedShards());
553+
assertEquals(0, indicesHealthValue.getDelayedUnassignedShards());
554+
assertEquals(0, indicesHealthValue.getRelocatingShards());
555+
assertEquals(indexHasMoreReplicas ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN, indicesHealthValue.getStatus());
556+
Map<Integer, ClusterShardHealth> shards = indicesHealthValue.getShards();
557+
assertFalse(shards.isEmpty());
558+
assertEquals(1, shards.size());
559+
for (Map.Entry<Integer, ClusterShardHealth> shardHealth : shards.entrySet()) {
560+
ClusterShardHealth clusterShardHealth = shardHealth.getValue();
561+
assertEquals(2, clusterShardHealth.getActiveShards());
562+
assertEquals(indexHasMoreReplicas ? 1 : 0, clusterShardHealth.getUnassignedShards());
563+
assertEquals(0, clusterShardHealth.getDelayedUnassignedShards());
564+
assertEquals(0, clusterShardHealth.getRelocatingShards());
565+
assertEquals(0, clusterShardHealth.getInitializingShards());
566+
assertTrue(clusterShardHealth.isPrimaryActive());
567+
assertEquals(indexHasMoreReplicas ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN, clusterShardHealth.getStatus());
568+
}
569+
}
570+
}
571+
572+
public void testHealthAtLevelParamEqualsAttributeAwarenessIsHonoured() {
573+
createIndex(
574+
"test1",
575+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
576+
);
577+
ensureGreen();
578+
ClusterHealthResponse healthResponse = client().admin()
579+
.cluster()
580+
.prepareHealth()
581+
.setLevel("awareness_attributes")
582+
.setHonourClusterHealthLevel(true)
583+
.execute()
584+
.actionGet();
585+
assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus());
586+
assertTrue(healthResponse.getIndices().isEmpty());
587+
assertNotNull(healthResponse.getClusterAwarenessHealth());
588+
assertEquals(1, healthResponse.getActiveShards());
589+
assertEquals(1, healthResponse.getActivePrimaryShards());
590+
assertEquals(0, healthResponse.getUnassignedShards());
591+
assertEquals(0, healthResponse.getInitializingShards());
592+
assertEquals(0, healthResponse.getRelocatingShards());
593+
assertEquals(0, healthResponse.getDelayedUnassignedShards());
594+
}
442595
}

server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequest.java

+25
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,17 @@ public class ClusterHealthRequest extends ClusterManagerNodeReadRequest<ClusterH
7676
*/
7777
private Level level = Level.CLUSTER;
7878

79+
/**
80+
* This flag will be used by the TransportClusterHealthAction to decide if indices/shards info is required in the ClusterHealthResponse or not.
81+
* When the flag is disabled - indices/shard info will be returned in ClusterHealthResponse regardless of the health level requested.
82+
* When the flag is enabled - indices/shards info will be set according to health level requested.
83+
* For Level.CLUSTER (or) LevelAWARENESS_ATTRIBUTES - information on indices/shards will NOT be returned to the transport client
84+
* For Level.INDICES - information on indices will be returned to the transport client.
85+
* For Level.SHARDS - information on indices and shards will be returned to the transport client
86+
* By default, the flag is disabled.
87+
*/
88+
private boolean honourClusterHealthLevel = false;
89+
7990
public ClusterHealthRequest() {}
8091

8192
public ClusterHealthRequest(String... indices) {
@@ -104,6 +115,9 @@ public ClusterHealthRequest(StreamInput in) throws IOException {
104115
if (in.getVersion().onOrAfter(Version.V_2_6_0)) {
105116
ensureNodeWeighedIn = in.readBoolean();
106117
}
118+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
119+
honourClusterHealthLevel = in.readBoolean();
120+
}
107121
}
108122

109123
@Override
@@ -139,6 +153,9 @@ public void writeTo(StreamOutput out) throws IOException {
139153
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
140154
out.writeBoolean(ensureNodeWeighedIn);
141155
}
156+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
157+
out.writeBoolean(honourClusterHealthLevel);
158+
}
142159
}
143160

144161
@Override
@@ -337,6 +354,14 @@ public final boolean ensureNodeWeighedIn() {
337354
return ensureNodeWeighedIn;
338355
}
339356

357+
public boolean isHonourClusterHealthLevel() {
358+
return honourClusterHealthLevel;
359+
}
360+
361+
public void setHonourClusterHealthLevel(boolean honourClusterHealthLevel) {
362+
this.honourClusterHealthLevel = honourClusterHealthLevel;
363+
}
364+
340365
@Override
341366
public ActionRequestValidationException validate() {
342367
if (level.equals(Level.AWARENESS_ATTRIBUTES) && indices.length > 0) {

server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequestBuilder.java

+5
Original file line numberDiff line numberDiff line change
@@ -171,4 +171,9 @@ public final ClusterHealthRequestBuilder setEnsureNodeWeighedIn(boolean ensureNo
171171
request.ensureNodeWeighedIn(ensureNodeCommissioned);
172172
return this;
173173
}
174+
175+
public ClusterHealthRequestBuilder setHonourClusterHealthLevel(boolean honourClusterHealthLevel) {
176+
request.setHonourClusterHealthLevel(honourClusterHealthLevel);
177+
return this;
178+
}
174179
}

server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthResponse.java

+43
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,26 @@ public ClusterHealthResponse(
237237
this.clusterHealthStatus = clusterStateHealth.getStatus();
238238
}
239239

240+
public ClusterHealthResponse(
241+
String clusterName,
242+
String[] concreteIndices,
243+
ClusterHealthRequest clusterHealthRequest,
244+
ClusterState clusterState,
245+
int numberOfPendingTasks,
246+
int numberOfInFlightFetch,
247+
TimeValue taskMaxWaitingTime
248+
) {
249+
this.clusterName = clusterName;
250+
this.numberOfPendingTasks = numberOfPendingTasks;
251+
this.numberOfInFlightFetch = numberOfInFlightFetch;
252+
this.taskMaxWaitingTime = taskMaxWaitingTime;
253+
this.clusterStateHealth = clusterHealthRequest.isHonourClusterHealthLevel()
254+
? new ClusterStateHealth(clusterState, concreteIndices, clusterHealthRequest.level())
255+
: new ClusterStateHealth(clusterState, concreteIndices);
256+
this.clusterHealthStatus = clusterStateHealth.getStatus();
257+
this.delayedUnassignedShards = clusterStateHealth.getDelayedUnassignedShards();
258+
}
259+
240260
// Awareness Attribute health
241261
public ClusterHealthResponse(
242262
String clusterName,
@@ -261,6 +281,29 @@ public ClusterHealthResponse(
261281
this.clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, awarenessAttributeName);
262282
}
263283

284+
public ClusterHealthResponse(
285+
String clusterName,
286+
ClusterHealthRequest clusterHealthRequest,
287+
ClusterState clusterState,
288+
ClusterSettings clusterSettings,
289+
String[] concreteIndices,
290+
String awarenessAttributeName,
291+
int numberOfPendingTasks,
292+
int numberOfInFlightFetch,
293+
TimeValue taskMaxWaitingTime
294+
) {
295+
this(
296+
clusterName,
297+
concreteIndices,
298+
clusterHealthRequest,
299+
clusterState,
300+
numberOfPendingTasks,
301+
numberOfInFlightFetch,
302+
taskMaxWaitingTime
303+
);
304+
this.clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, awarenessAttributeName);
305+
}
306+
264307
/**
265308
* For XContent Parser and serialization tests
266309
*/

server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
5353
import org.opensearch.cluster.node.DiscoveryNode;
5454
import org.opensearch.cluster.routing.NodeWeighedAwayException;
55-
import org.opensearch.cluster.routing.UnassignedInfo;
5655
import org.opensearch.cluster.routing.WeightedRoutingUtils;
5756
import org.opensearch.cluster.routing.allocation.AllocationService;
5857
import org.opensearch.cluster.service.ClusterService;
@@ -487,7 +486,12 @@ private ClusterHealthResponse clusterHealth(
487486
TimeValue pendingTaskTimeInQueue
488487
) {
489488
if (logger.isTraceEnabled()) {
490-
logger.trace("Calculating health based on state version [{}]", clusterState.version());
489+
logger.trace(
490+
"Calculating health based on state version [{}] for health level [{}] and honourClusterHealthLevel set to [{}]",
491+
clusterState.version(),
492+
request.level(),
493+
request.isHonourClusterHealthLevel()
494+
);
491495
}
492496

493497
String[] concreteIndices;
@@ -496,13 +500,13 @@ private ClusterHealthResponse clusterHealth(
496500
concreteIndices = clusterState.getMetadata().getConcreteAllIndices();
497501
return new ClusterHealthResponse(
498502
clusterState.getClusterName().value(),
503+
request,
499504
clusterState,
500505
clusterService.getClusterSettings(),
501506
concreteIndices,
502507
awarenessAttribute,
503508
numberOfPendingTasks,
504509
numberOfInFlightFetch,
505-
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
506510
pendingTaskTimeInQueue
507511
);
508512
}
@@ -514,10 +518,10 @@ private ClusterHealthResponse clusterHealth(
514518
ClusterHealthResponse response = new ClusterHealthResponse(
515519
clusterState.getClusterName().value(),
516520
Strings.EMPTY_ARRAY,
521+
request,
517522
clusterState,
518523
numberOfPendingTasks,
519524
numberOfInFlightFetch,
520-
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
521525
pendingTaskTimeInQueue
522526
);
523527
response.setStatus(ClusterHealthStatus.RED);
@@ -527,10 +531,10 @@ private ClusterHealthResponse clusterHealth(
527531
return new ClusterHealthResponse(
528532
clusterState.getClusterName().value(),
529533
concreteIndices,
534+
request,
530535
clusterState,
531536
numberOfPendingTasks,
532537
numberOfInFlightFetch,
533-
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
534538
pendingTaskTimeInQueue
535539
);
536540
}

server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import org.apache.lucene.store.AlreadyClosedException;
3636
import org.opensearch.action.FailedNodeException;
37+
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
3738
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
3839
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
3940
import org.opensearch.action.admin.indices.stats.CommonStats;
@@ -209,7 +210,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
209210

210211
ClusterHealthStatus clusterStatus = null;
211212
if (clusterService.state().nodes().isLocalNodeElectedClusterManager()) {
212-
clusterStatus = new ClusterStateHealth(clusterService.state()).getStatus();
213+
clusterStatus = new ClusterStateHealth(clusterService.state(), ClusterHealthRequest.Level.CLUSTER).getStatus();
213214
}
214215

215216
return new ClusterStatsNodeResponse(

0 commit comments

Comments
 (0)