@@ -66,10 +66,11 @@ public class AutoForceMergeManager extends AbstractLifecycleComponent {
6666 private ConfigurationValidator configurationValidator ;
6767 private NodeValidator nodeValidator ;
6868 private ShardValidator shardValidator ;
69+ private Integer allocatedProcessors ;
70+ private ResourceTrackerProvider .ResourceTrackers resourceTrackers ;
6971 private final ForceMergeManagerSettings forceMergeManagerSettings ;
7072 private final CommonStatsFlags flags = new CommonStatsFlags (CommonStatsFlags .Flag .Segments , CommonStatsFlags .Flag .Translog );
7173 private final Set <Integer > mergingShards ;
72- private Integer allocatedProcessors ;
7374
7475 private static final Logger logger = LogManager .getLogger (AutoForceMergeManager .class );
7576
@@ -96,6 +97,7 @@ protected void doStart() {
9697 this .nodeValidator = new NodeValidator ();
9798 this .shardValidator = new ShardValidator ();
9899 this .allocatedProcessors = OpenSearchExecutors .allocatedProcessors (clusterService .getSettings ());
100+ this .resourceTrackers = ResourceTrackerProvider .create (threadPool );
99101 }
100102
101103 @ Override
@@ -117,43 +119,65 @@ private void modifySchedulerInterval(TimeValue schedulerInterval) {
117119 }
118120
119121 private void triggerForceMerge () {
122+ if (isValidForForceMerge () == false ) {
123+ return ;
124+ }
125+ executeForceMergeOnShards ();
126+ }
127+
128+ private boolean isValidForForceMerge () {
120129 if (configurationValidator .hasWarmNodes () == false ) {
130+ resourceTrackers .stop ();
121131 logger .debug ("No warm nodes found. Skipping Auto Force merge." );
122- return ;
132+ return false ;
123133 }
124134 if (nodeValidator .validate ().isAllowed () == false ) {
125135 logger .debug ("Node capacity constraints are not allowing to trigger auto ForceMerge" );
126- return ;
136+ return false ;
127137 }
128- int iteration = nodeValidator .getMaxConcurrentForceMerges ();
138+ return true ;
139+ }
140+
141+ private void executeForceMergeOnShards () {
142+ int remainingIterations = nodeValidator .getMaxConcurrentForceMerges ();
129143 for (IndexShard shard : getShardsBasedOnSorting (indicesService )) {
130- if (iteration == 0 ) {
144+ if (remainingIterations == 0 || !nodeValidator .validate ().isAllowed ()) {
145+ if (remainingIterations > 0 ) {
146+ logger .debug ("Node conditions no longer suitable for force merge." );
147+ }
131148 break ;
132149 }
133- if (nodeValidator .validate ().isAllowed () == false ) {
134- logger .debug ("Node conditions no longer suitable for force merge." );
150+ remainingIterations --;
151+ executeForceMergeForShard (shard );
152+ if (!waitBetweenShards ()) {
135153 break ;
136154 }
137- iteration --;
138- CompletableFuture .runAsync (() -> {
139- try {
140- mergingShards .add (shard .shardId ().getId ());
141- shard .forceMerge (new ForceMergeRequest ().maxNumSegments (forceMergeManagerSettings .getSegmentCount ()));
142- logger .debug ("Merging is completed successfully for the shard {}" , shard .shardId ());
143- } catch (Exception e ) {
144- logger .error ("Error during force merge for shard {}\n Exception: {}" , shard .shardId (), e );
145- } finally {
146- mergingShards .remove (shard .shardId ().getId ());
147- }
148- }, threadPool .executor (ThreadPool .Names .FORCE_MERGE ));
149- logger .info ("Successfully triggered force merge for shard {}" , shard .shardId ());
155+ }
156+ }
157+
158+ private void executeForceMergeForShard (IndexShard shard ) {
159+ CompletableFuture .runAsync (() -> {
150160 try {
151- Thread .sleep (forceMergeManagerSettings .getForcemergeDelay ().getMillis ());
152- } catch (InterruptedException e ) {
153- Thread .currentThread ().interrupt ();
154- logger .error ("Timer was interrupted while waiting between shards" , e );
155- break ;
161+ mergingShards .add (shard .shardId ().getId ());
162+ shard .forceMerge (new ForceMergeRequest ().maxNumSegments (forceMergeManagerSettings .getSegmentCount ()));
163+ logger .debug ("Merging is completed successfully for the shard {}" , shard .shardId ());
164+ } catch (Exception e ) {
165+ logger .error ("Error during force merge for shard {}\n Exception: {}" , shard .shardId (), e );
166+ } finally {
167+ mergingShards .remove (shard .shardId ().getId ());
156168 }
169+ }, threadPool .executor (ThreadPool .Names .FORCE_MERGE ));
170+ logger .info ("Successfully triggered force merge for shard {}" , shard .shardId ());
171+ }
172+
173+ private boolean waitBetweenShards () {
174+ try {
175+ Thread .sleep (forceMergeManagerSettings .getForcemergeDelay ().getMillis ());
176+ return true ;
177+ } catch (InterruptedException e ) {
178+ Thread .currentThread ().interrupt ();
179+ logger .error ("Timer was interrupted while waiting between shards" , e );
180+ return false ;
157181 }
158182 }
159183
@@ -264,15 +288,14 @@ protected class NodeValidator implements ValidationStrategy {
264288
265289 @ Override
266290 public ValidationResult validate () {
291+ resourceTrackers .start ();
267292 if (isCpuUsageOverThreshold ()) {
268293 return new ValidationResult (false );
269294 }
270295 if (isDiskUsageOverThreshold ()) {
271296 return new ValidationResult (false );
272297 }
273- double jvmUsedPercent = jvmService .stats ().getMem ().getHeapUsedPercent ();
274- if (jvmUsedPercent >= forceMergeManagerSettings .getJvmThreshold ()) {
275- logger .debug ("JVM memory: {}% breached the threshold: {}" , jvmUsedPercent , forceMergeManagerSettings .getJvmThreshold ());
298+ if (isJvmUsageOverThreshold ()) {
276299 return new ValidationResult (false );
277300 }
278301 if (areForceMergeThreadsAvailable () == false ) {
@@ -291,24 +314,34 @@ private boolean areForceMergeThreadsAvailable() {
291314 return false ;
292315 }
293316
317+ private boolean isJvmUsageOverThreshold () {
318+ double jvmAverage = resourceTrackers .jvmFiveMinute .getAverage ();
319+ if (jvmAverage >= forceMergeManagerSettings .getJvmThreshold ()) {
320+ logger .debug ("JVM Average: 5m({}%) breached the threshold: {}" , jvmAverage , forceMergeManagerSettings .getJvmThreshold ());
321+ return true ;
322+ }
323+ jvmAverage = resourceTrackers .jvmOneMinute .getAverage ();
324+ if (jvmAverage >= forceMergeManagerSettings .getJvmThreshold ()) {
325+ logger .debug ("JVM Average: 1m({}%) breached the threshold: {}" , jvmAverage , forceMergeManagerSettings .getJvmThreshold ());
326+ return true ;
327+ }
328+ double jvmUsedPercent = jvmService .stats ().getMem ().getHeapUsedPercent ();
329+ if (jvmUsedPercent >= forceMergeManagerSettings .getJvmThreshold ()) {
330+ logger .debug ("JVM memory: {}% breached the threshold: {}" , jvmUsedPercent , forceMergeManagerSettings .getJvmThreshold ());
331+ return true ;
332+ }
333+ return false ;
334+ }
335+
294336 private boolean isCpuUsageOverThreshold () {
295- double [] loadAverage = osService .stats ().getCpu ().getLoadAverage ();
296- double loadAverage5m = (loadAverage [1 ] / (double ) allocatedProcessors ) * 100 ;
297- if (loadAverage5m >= forceMergeManagerSettings .getCpuThreshold ()) {
298- logger .debug (
299- "Load Average: 5m({}%) breached the threshold: {}" ,
300- loadAverage5m ,
301- forceMergeManagerSettings .getCpuThreshold ()
302- );
337+ double cpuAverage = resourceTrackers .cpuFiveMinute .getAverage ();
338+ if (cpuAverage >= forceMergeManagerSettings .getCpuThreshold ()) {
339+ logger .debug ("CPU Average: 5m({}%) breached the threshold: {}" , cpuAverage , forceMergeManagerSettings .getCpuThreshold ());
303340 return true ;
304341 }
305- double loadAverage1m = (loadAverage [0 ] / (double ) allocatedProcessors ) * 100 ;
306- if (loadAverage1m >= forceMergeManagerSettings .getCpuThreshold ()) {
307- logger .debug (
308- "Load Average: 1m({}%) breached the threshold: {}" ,
309- loadAverage1m ,
310- forceMergeManagerSettings .getCpuThreshold ()
311- );
342+ cpuAverage = resourceTrackers .cpuOneMinute .getAverage ();
343+ if (cpuAverage >= forceMergeManagerSettings .getCpuThreshold ()) {
344+ logger .debug ("CPU Average: 1m({}%) breached the threshold: {}" , cpuAverage , forceMergeManagerSettings .getCpuThreshold ());
312345 return true ;
313346 }
314347 double cpuPercent = osService .stats ().getCpu ().getPercent ();
@@ -445,6 +478,7 @@ protected boolean mustReschedule() {
445478 @ Override
446479 protected void runInternal () {
447480 if (configurationValidator .validate ().isAllowed () == false ) {
481+ resourceTrackers .stop ();
448482 return ;
449483 }
450484 triggerForceMerge ();
0 commit comments