Skip to content

Commit 9f6c067

Browse files
authored
Add TopNSearchTasksLogger settings to Cluster Settings (opensearch-project#6716)
Signed-off-by: PritLadani <[email protected]>
1 parent 197086f commit 9f6c067

File tree

6 files changed

+80
-20
lines changed

6 files changed

+80
-20
lines changed

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+3
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.opensearch.search.backpressure.settings.SearchTaskSettings;
5050
import org.opensearch.tasks.TaskManager;
5151
import org.opensearch.tasks.TaskResourceTrackingService;
52+
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
5253
import org.opensearch.watcher.ResourceWatcherService;
5354
import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
5455
import org.opensearch.action.admin.indices.close.TransportCloseIndexAction;
@@ -599,6 +600,8 @@ public void apply(Settings value, Settings current, Settings previous) {
599600
IndexingPressure.MAX_INDEXING_BYTES,
600601
TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED,
601602
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED,
603+
TopNSearchTasksLogger.LOG_TOP_QUERIES_SIZE_SETTING,
604+
TopNSearchTasksLogger.LOG_TOP_QUERIES_FREQUENCY_SETTING,
602605
ClusterManagerTaskThrottler.THRESHOLD_SETTINGS,
603606
ClusterManagerTaskThrottler.BASE_DELAY_SETTINGS,
604607
ClusterManagerTaskThrottler.MAX_DELAY_SETTINGS,

server/src/main/java/org/opensearch/node/Node.java

+3
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
6060
import org.opensearch.search.pipeline.SearchPipelineService;
6161
import org.opensearch.tasks.TaskResourceTrackingService;
62+
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
6263
import org.opensearch.threadpool.RunnableTaskExecutionListener;
6364
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
6465
import org.opensearch.watcher.ResourceWatcherService;
@@ -860,6 +861,8 @@ protected Node(
860861
settingsModule.getClusterSettings(),
861862
taskHeaders
862863
);
864+
TopNSearchTasksLogger taskConsumer = new TopNSearchTasksLogger(settings, settingsModule.getClusterSettings());
865+
transportService.getTaskManager().registerTaskResourceConsumer(taskConsumer);
863866
if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) {
864867
this.extensionsManager.initializeServicesAndRestHandler(
865868
actionModule,

server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ void doRun() {
197197
}
198198

199199
for (TaskCancellation taskCancellation : getTaskCancellations(cancellableTasks)) {
200-
logger.debug(
200+
logger.warn(
201201
"[{} mode] cancelling task [{}] due to high resource consumption [{}]",
202202
mode.getName(),
203203
taskCancellation.getTask().getId(),

server/src/main/java/org/opensearch/tasks/TaskManager.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
import org.opensearch.common.util.concurrent.ConcurrentCollections;
6161
import org.opensearch.common.util.concurrent.ConcurrentMapLong;
6262
import org.opensearch.common.util.concurrent.ThreadContext;
63-
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
6463
import org.opensearch.threadpool.ThreadPool;
6564
import org.opensearch.transport.TcpChannel;
6665

@@ -69,6 +68,7 @@
6968
import java.util.Collection;
7069
import java.util.Collections;
7170
import java.util.HashMap;
71+
import java.util.HashSet;
7272
import java.util.Iterator;
7373
import java.util.List;
7474
import java.util.Map;
@@ -148,7 +148,11 @@ public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHea
148148
this.taskHeaders = new ArrayList<>(taskHeaders);
149149
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
150150
this.taskResourceConsumersEnabled = TASK_RESOURCE_CONSUMERS_ENABLED.get(settings);
151-
this.taskResourceConsumer = Set.of(new TopNSearchTasksLogger(settings));
151+
taskResourceConsumer = new HashSet<>();
152+
}
153+
154+
public void registerTaskResourceConsumer(Consumer<Task> consumer) {
155+
taskResourceConsumer.add(consumer);
152156
}
153157

154158
public void setTaskResultsService(TaskResultsService taskResultsService) {

server/src/main/java/org/opensearch/tasks/consumer/TopNSearchTasksLogger.java

+24-9
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.logging.log4j.Logger;
1313
import org.opensearch.action.search.SearchShardTask;
1414
import org.opensearch.common.collect.Tuple;
15+
import org.opensearch.common.settings.ClusterSettings;
1516
import org.opensearch.common.settings.Setting;
1617
import org.opensearch.common.settings.Settings;
1718
import org.opensearch.common.unit.TimeValue;
@@ -30,36 +31,41 @@
3031
*/
3132
public class TopNSearchTasksLogger implements Consumer<Task> {
3233
public static final String TASK_DETAILS_LOG_PREFIX = "task.detailslog";
33-
public static final String LOG_TOP_QUERIES_SIZE = "cluster.task.consumers.top_n.size";
34-
public static final String LOG_TOP_QUERIES_FREQUENCY = "cluster.task.consumers.top_n.frequency";
34+
private static final String LOG_TOP_QUERIES_SIZE = "cluster.task.consumers.top_n.size";
35+
private static final String LOG_TOP_QUERIES_FREQUENCY = "cluster.task.consumers.top_n.frequency";
3536

3637
private static final Logger SEARCH_TASK_DETAILS_LOGGER = LogManager.getLogger(TASK_DETAILS_LOG_PREFIX + ".search");
3738

3839
// number of memory expensive search tasks that are logged
39-
private static final Setting<Integer> LOG_TOP_QUERIES_SIZE_SETTING = Setting.intSetting(
40+
public static final Setting<Integer> LOG_TOP_QUERIES_SIZE_SETTING = Setting.intSetting(
4041
LOG_TOP_QUERIES_SIZE,
4142
10,
43+
1,
44+
100,
4245
Setting.Property.Dynamic,
4346
Setting.Property.NodeScope
4447
);
4548

4649
// frequency in which memory expensive search tasks are logged
47-
private static final Setting<TimeValue> LOG_TOP_QUERIES_FREQUENCY_SETTING = Setting.timeSetting(
50+
public static final Setting<TimeValue> LOG_TOP_QUERIES_FREQUENCY_SETTING = Setting.timeSetting(
4851
LOG_TOP_QUERIES_FREQUENCY,
4952
TimeValue.timeValueSeconds(60L),
53+
TimeValue.timeValueSeconds(60L),
5054
Setting.Property.Dynamic,
5155
Setting.Property.NodeScope
5256
);
5357

54-
private final int topQueriesSize;
55-
private final long topQueriesLogFrequencyInNanos;
58+
private volatile int topQueriesSize;
59+
private volatile long topQueriesLogFrequencyInNanos;
5660
private final Queue<Tuple<Long, SearchShardTask>> topQueries;
5761
private long lastReportedTimeInNanos = System.nanoTime();
5862

59-
public TopNSearchTasksLogger(Settings settings) {
63+
public TopNSearchTasksLogger(Settings settings, ClusterSettings clusterSettings) {
6064
this.topQueriesSize = LOG_TOP_QUERIES_SIZE_SETTING.get(settings);
6165
this.topQueriesLogFrequencyInNanos = LOG_TOP_QUERIES_FREQUENCY_SETTING.get(settings).getNanos();
6266
this.topQueries = new PriorityQueue<>(topQueriesSize, Comparator.comparingLong(Tuple::v1));
67+
clusterSettings.addSettingsUpdateConsumer(LOG_TOP_QUERIES_SIZE_SETTING, this::setLogTopQueriesSize);
68+
clusterSettings.addSettingsUpdateConsumer(LOG_TOP_QUERIES_FREQUENCY_SETTING, this::setTopQueriesLogFrequencyInNanos);
6369
}
6470

6571
/**
@@ -78,11 +84,12 @@ private synchronized void recordSearchTask(SearchShardTask searchTask) {
7884
publishTopNEvents();
7985
lastReportedTimeInNanos = System.nanoTime();
8086
}
81-
if (topQueries.size() >= topQueriesSize && topQueries.peek().v1() < memory_in_bytes) {
87+
int topQSize = topQueriesSize;
88+
if (topQueries.size() >= topQSize && topQueries.peek().v1() < memory_in_bytes) {
8289
// evict the element
8390
topQueries.poll();
8491
}
85-
if (topQueries.size() < topQueriesSize) {
92+
if (topQueries.size() < topQSize) {
8693
topQueries.offer(new Tuple<>(memory_in_bytes, searchTask));
8794
}
8895
}
@@ -97,4 +104,12 @@ private void logTopResourceConsumingQueries() {
97104
SEARCH_TASK_DETAILS_LOGGER.info(new SearchShardTaskDetailsLogMessage(topQuery.v2()));
98105
}
99106
}
107+
108+
private void setLogTopQueriesSize(int topQueriesSize) {
109+
this.topQueriesSize = topQueriesSize;
110+
}
111+
112+
void setTopQueriesLogFrequencyInNanos(TimeValue timeValue) {
113+
this.topQueriesLogFrequencyInNanos = timeValue.getNanos();
114+
}
100115
}

server/src/test/java/org/opensearch/tasks/consumer/TopNSearchTasksLoggerTests.java

+43-8
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@
1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
1414
import org.apache.logging.log4j.core.LogEvent;
15+
import org.junit.After;
1516
import org.junit.AfterClass;
1617
import org.junit.BeforeClass;
1718
import org.opensearch.action.search.SearchShardTask;
1819
import org.opensearch.common.logging.Loggers;
1920
import org.opensearch.common.logging.MockAppender;
21+
import org.opensearch.common.settings.ClusterSettings;
2022
import org.opensearch.common.settings.Settings;
23+
import org.opensearch.common.unit.TimeValue;
2124
import org.opensearch.tasks.ResourceStats;
2225
import org.opensearch.tasks.ResourceStatsType;
2326
import org.opensearch.tasks.ResourceUsageMetric;
@@ -26,8 +29,9 @@
2629

2730
import java.util.Collections;
2831

29-
import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_FREQUENCY;
30-
import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_SIZE;
32+
import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_SIZE_SETTING;
33+
import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_FREQUENCY_SETTING;
34+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3135

3236
public class TopNSearchTasksLoggerTests extends OpenSearchSingleNodeTestCase {
3337
static MockAppender appender;
@@ -42,15 +46,34 @@ public static void init() throws IllegalAccessException {
4246
Loggers.addAppender(searchLogger, appender);
4347
}
4448

49+
@After
50+
public void cleanupAfterTest() {
51+
assertAcked(
52+
client().admin()
53+
.cluster()
54+
.prepareUpdateSettings()
55+
.setPersistentSettings(Settings.builder().putNull("*"))
56+
.setTransientSettings(Settings.builder().putNull("*"))
57+
);
58+
}
59+
4560
@AfterClass
4661
public static void cleanup() {
4762
Loggers.removeAppender(searchLogger, appender);
4863
appender.stop();
4964
}
5065

5166
public void testLoggerWithTasks() {
52-
final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "0ms").build();
53-
topNSearchTasksLogger = new TopNSearchTasksLogger(settings);
67+
final Settings settings = Settings.builder()
68+
.put(LOG_TOP_QUERIES_SIZE_SETTING.getKey(), 1)
69+
.put(LOG_TOP_QUERIES_FREQUENCY_SETTING.getKey(), "60s")
70+
.build();
71+
topNSearchTasksLogger = new TopNSearchTasksLogger(
72+
settings,
73+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
74+
);
75+
// This setting overrides is just for testing purpose
76+
topNSearchTasksLogger.setTopQueriesLogFrequencyInNanos(TimeValue.timeValueMillis(0));
5477
generateTasks(5);
5578
LogEvent logEvent = appender.getLastEventAndReset();
5679
assertNotNull(logEvent);
@@ -59,16 +82,28 @@ public void testLoggerWithTasks() {
5982
}
6083

6184
public void testLoggerWithoutTasks() {
62-
final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "500ms").build();
63-
topNSearchTasksLogger = new TopNSearchTasksLogger(settings);
85+
final Settings settings = Settings.builder()
86+
.put(LOG_TOP_QUERIES_SIZE_SETTING.getKey(), 1)
87+
.put(LOG_TOP_QUERIES_FREQUENCY_SETTING.getKey(), "60s")
88+
.build();
89+
topNSearchTasksLogger = new TopNSearchTasksLogger(
90+
settings,
91+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
92+
);
6493

6594
assertNull(appender.getLastEventAndReset());
6695
}
6796

6897
public void testLoggerWithHighFrequency() {
6998
// setting the frequency to a really large value and confirming that nothing gets written to log file.
70-
final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "10m").build();
71-
topNSearchTasksLogger = new TopNSearchTasksLogger(settings);
99+
final Settings settings = Settings.builder()
100+
.put(LOG_TOP_QUERIES_SIZE_SETTING.getKey(), 1)
101+
.put(LOG_TOP_QUERIES_FREQUENCY_SETTING.getKey(), "10m")
102+
.build();
103+
topNSearchTasksLogger = new TopNSearchTasksLogger(
104+
settings,
105+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
106+
);
72107
generateTasks(5);
73108
generateTasks(2);
74109

0 commit comments

Comments
 (0)