Skip to content

Commit 34445ef

Browse files
committed
feat(wildcard-cluster): Support wildcard for cluster names in anomaly detection jobs
- Updated time field and category field validation methods to query all remote indices when wildcard specified for cluster name - Added @Inject annotation for TransportService in multiple action handlers Signed-off-by: Tim Baker <[email protected]>
1 parent a6c1916 commit 34445ef

13 files changed

+177
-14
lines changed

src/main/java/org/opensearch/ad/rest/handler/ValidateAnomalyDetectorActionHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.opensearch.timeseries.model.Config;
2525
import org.opensearch.timeseries.transport.ValidateConfigResponse;
2626
import org.opensearch.timeseries.util.SecurityClientUtil;
27+
import org.opensearch.transport.*;
2728
import org.opensearch.transport.client.Client;
2829

2930
/**
@@ -57,6 +58,7 @@ public ValidateAnomalyDetectorActionHandler(
5758
ClusterService clusterService,
5859
Client client,
5960
SecurityClientUtil clientUtil,
61+
TransportService transportService,
6062
ADIndexManagement anomalyDetectionIndices,
6163
Config anomalyDetector,
6264
TimeValue requestTimeout,
@@ -76,7 +78,7 @@ public ValidateAnomalyDetectorActionHandler(
7678
clusterService,
7779
client,
7880
clientUtil,
79-
null,
81+
transportService,
8082
anomalyDetectionIndices,
8183
Config.NO_ID,
8284
null,

src/main/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ protected Processor<ValidateConfigResponse> createProcessor(Config detector, Val
7878
clusterService,
7979
client,
8080
clientUtil,
81+
transportService,
8182
indexManagement,
8283
detector,
8384
request.getRequestTimeout(),

src/main/java/org/opensearch/forecast/rest/handler/ValidateForecasterActionHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.timeseries.model.Config;
1919
import org.opensearch.timeseries.transport.ValidateConfigResponse;
2020
import org.opensearch.timeseries.util.SecurityClientUtil;
21+
import org.opensearch.transport.*;
2122
import org.opensearch.transport.client.Client;
2223

2324
/**
@@ -53,6 +54,7 @@ public class ValidateForecasterActionHandler extends AbstractForecasterActionHan
5354

5455
public ValidateForecasterActionHandler(
5556
ClusterService clusterService,
57+
TransportService transportService,
5658
Client client,
5759
SecurityClientUtil clientUtil,
5860
ForecastIndexManagement forecastIndices,
@@ -74,7 +76,7 @@ public ValidateForecasterActionHandler(
7476
clusterService,
7577
client,
7678
clientUtil,
77-
null,
79+
transportService,
7880
forecastIndices,
7981
Config.NO_ID,
8082
null,

src/main/java/org/opensearch/forecast/transport/ValidateForecasterTransportAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public ValidateForecasterTransportAction(
7171
protected Processor<ValidateConfigResponse> createProcessor(Config forecaster, ValidateConfigRequest request, User user) {
7272
return new ValidateForecasterActionHandler(
7373
clusterService,
74+
transportService,
7475
client,
7576
clientUtil,
7677
indexManagement,

src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java

Lines changed: 152 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@
9595
import org.opensearch.timeseries.util.ParseUtils;
9696
import org.opensearch.timeseries.util.RestHandlerUtils;
9797
import org.opensearch.timeseries.util.SecurityClientUtil;
98-
import org.opensearch.transport.TransportService;
98+
import org.opensearch.transport.*;
9999
import org.opensearch.transport.client.Client;
100100

101101
import com.google.common.collect.Sets;
@@ -156,6 +156,7 @@ public abstract class AbstractTimeSeriesActionHandler<T extends ActionResponse,
156156
protected final ConfigUpdateConfirmer<IndexType, IndexManagementType, TaskCacheManagerType, TaskTypeEnum, TaskClass, TaskManagerType> handler;
157157
protected final ClusterService clusterService;
158158
protected final NamedXContentRegistry xContentRegistry;
159+
protected final TransportService transportService;
159160
protected final TimeValue requestTimeout;
160161
protected final WriteRequest.RefreshPolicy refreshPolicy;
161162
protected final Long seqNo;
@@ -217,6 +218,7 @@ public AbstractTimeSeriesActionHandler(
217218
this.method = method;
218219
this.clusterService = clusterService;
219220
this.xContentRegistry = xContentRegistry;
221+
this.transportService = transportService;
220222
this.requestTimeout = requestTimeout;
221223
this.refreshPolicy = refreshPolicy;
222224
this.seqNo = seqNo;
@@ -321,13 +323,86 @@ protected void validateName(boolean indexingDryRun, ActionListener<T> listener)
321323
listener.onFailure(createValidationException(AbstractTimeSeriesActionHandler.INVALID_NAME_SIZE, ValidationIssueType.NAME));
322324
return;
323325
}
324-
validateTimeField(indexingDryRun, listener);
326+
validateTimeFieldInAllClusters(indexingDryRun, listener);
325327
}
326328

327-
protected void validateTimeField(boolean indexingDryRun, ActionListener<T> listener) {
329+
protected void validateTimeFieldInAllClusters(boolean indexingDryRun, ActionListener<T> listener) {
330+
List<String> wildcardClusterIndices = config.getIndices().stream().filter(idx -> idx.startsWith("*:")).toList();
331+
List<String> nonWildcardClusterIndices = config.getIndices().stream().filter(idx -> !idx.startsWith("*:")).toList();
332+
333+
Map<String, List<String>> clusterIndicesMap = new HashMap<>();
334+
335+
// if no wildcard cluster indices found, fallback to typical validation
336+
if (wildcardClusterIndices.isEmpty()) {
337+
if (!nonWildcardClusterIndices.isEmpty()) {
338+
HashMap<String, List<String>> nonWildcardMap = CrossClusterConfigUtils
339+
.separateClusterIndexes(nonWildcardClusterIndices, clusterService);
340+
clusterIndicesMap.putAll(nonWildcardMap);
341+
validateTimeField(clusterIndicesMap, indexingDryRun, listener);
342+
} else {
343+
listener
344+
.onFailure(
345+
new ValidationException(
346+
"No indices specified for time field validation.",
347+
ValidationIssueType.TIMEFIELD_FIELD,
348+
configValidationAspect
349+
)
350+
);
351+
}
352+
return;
353+
}
354+
355+
// if there are wildcard cluster indices, check for remote clusters. throw exception if none configured
356+
RemoteClusterService remoteClusterService = transportService.getRemoteClusterService();
357+
Set<String> remoteClusters = remoteClusterService.getRegisteredRemoteClusterNames();
358+
359+
if (remoteClusters.isEmpty()) {
360+
listener
361+
.onFailure(
362+
new ValidationException(
363+
"Indices with wildcard cluster prefix specified, but no remote clusters are configured. Please configure remote clusters or remove the wildcard cluster prefix from the indices.",
364+
ValidationIssueType.TIMEFIELD_FIELD,
365+
configValidationAspect
366+
)
367+
);
368+
return;
369+
}
370+
371+
// For each remote cluster, add all indices matching the wildcard pattern
372+
for (String remote : remoteClusters) {
373+
List<String> remoteIndices = wildcardClusterIndices
374+
.stream()
375+
.map(idx -> idx.substring(2)) // remove "*:" prefix
376+
.toList();
377+
378+
if (!remoteIndices.isEmpty()) {
379+
clusterIndicesMap.put(remote, remoteIndices);
380+
}
381+
}
382+
383+
// add non-wildcard indices (local or explicit remote) to the map
384+
if (!nonWildcardClusterIndices.isEmpty()) {
385+
HashMap<String, List<String>> nonWildcardIndicesMap = CrossClusterConfigUtils
386+
.separateClusterIndexes(nonWildcardClusterIndices, clusterService);
387+
clusterIndicesMap.putAll(nonWildcardIndicesMap);
388+
}
389+
390+
if (clusterIndicesMap.isEmpty()) {
391+
listener
392+
.onFailure(
393+
new ValidationException(
394+
"No indices found for time field validation.",
395+
ValidationIssueType.TIMEFIELD_FIELD,
396+
configValidationAspect
397+
)
398+
);
399+
} else {
400+
validateTimeField(clusterIndicesMap, indexingDryRun, listener);
401+
}
402+
}
403+
404+
protected void validateTimeField(Map<String, List<String>> clusterIndicesMap, boolean indexingDryRun, ActionListener<T> listener) {
328405
String givenTimeField = config.getTimeField();
329-
HashMap<String, List<String>> clusterIndicesMap = CrossClusterConfigUtils
330-
.separateClusterIndexes(config.getIndices(), clusterService);
331406

332407
ActionListener<MergeableList<Optional<double[]>>> validateGetMappingForTimeFieldListener = ActionListener.wrap(response -> {
333408
prepareConfigIndexing(indexingDryRun, listener);
@@ -732,7 +807,7 @@ protected void validateAgainstExistingHCConfig(String configId, boolean indexing
732807
)
733808
);
734809
} else {
735-
validateCategoricalFieldsInAllIndices(configId, indexingDryRun, listener);
810+
validateCategoricalFieldsInAllClusters(configId, indexingDryRun, listener);
736811
}
737812

738813
}
@@ -794,18 +869,83 @@ protected void onSearchHCConfigResponse(SearchResponse response, String detector
794869
}
795870
listener.onFailure(new IllegalArgumentException(errorMsg));
796871
} else {
797-
validateCategoricalFieldsInAllIndices(detectorId, indexingDryRun, listener);
872+
validateCategoricalFieldsInAllClusters(detectorId, indexingDryRun, listener);
798873
}
799874
}
800875

801-
protected void validateCategoricalFieldsInAllIndices(String configId, boolean indexingDryRun, ActionListener<T> listener) {
802-
HashMap<String, List<String>> clusterIndicesMap = CrossClusterConfigUtils
803-
.separateClusterIndexes(config.getIndices(), clusterService);
876+
protected void validateCategoricalFieldsInAllClusters(String configId, boolean indexingDryRun, ActionListener<T> listener) {
877+
List<String> wildcardClusterIndices = config.getIndices().stream().filter(idx -> idx.startsWith("*:")).toList();
878+
List<String> nonWildcardClusterIndices = config.getIndices().stream().filter(idx -> !idx.startsWith("*:")).toList();
879+
880+
HashMap<String, List<String>> clusterIndicesMap = new HashMap<>();
881+
882+
// if no wildcard cluster indices found, fallback to typical validation
883+
if (wildcardClusterIndices.isEmpty()) {
884+
if (!nonWildcardClusterIndices.isEmpty()) {
885+
HashMap<String, List<String>> nonWildcardIndicesMap = CrossClusterConfigUtils
886+
.separateClusterIndexes(nonWildcardClusterIndices, clusterService);
887+
clusterIndicesMap.putAll(nonWildcardIndicesMap);
888+
validateCategoricalField(clusterIndicesMap.entrySet().iterator(), configId, indexingDryRun, listener);
889+
} else {
890+
listener
891+
.onFailure(
892+
new ValidationException(
893+
"No indices specified for categorical field validation.",
894+
ValidationIssueType.CATEGORY,
895+
configValidationAspect
896+
)
897+
);
898+
}
899+
return;
900+
}
901+
902+
// if there are wildcard cluster indices, check for remote clusters. throw exception if none configured
903+
RemoteClusterService remoteClusterService = transportService.getRemoteClusterService();
904+
Set<String> remoteClusters = remoteClusterService.getRegisteredRemoteClusterNames();
905+
906+
if (remoteClusters.isEmpty()) {
907+
listener
908+
.onFailure(
909+
new ValidationException(
910+
"Indices with wildcard cluster prefix specified, but no remote clusters are configured. Please configure remote clusters or remove the wildcard cluster prefix from the indices.",
911+
ValidationIssueType.CATEGORY,
912+
configValidationAspect
913+
)
914+
);
915+
return;
916+
}
917+
918+
// for each remote cluster, add all indices with wildcard cluster prefix
919+
for (String remote : remoteClusters) {
920+
List<String> remoteIndices = wildcardClusterIndices
921+
.stream()
922+
.map(idx -> idx.substring(2)) // remove "*:" prefix
923+
.toList();
804924

805-
Iterator<Map.Entry<String, List<String>>> iterator = clusterIndicesMap.entrySet().iterator();
925+
if (!remoteIndices.isEmpty()) {
926+
clusterIndicesMap.put(remote, remoteIndices);
927+
}
928+
}
806929

807-
validateCategoricalField(iterator, configId, indexingDryRun, listener);
930+
// add non-wildcard cluster indices (local or explicit remote) to the map
931+
if (!nonWildcardClusterIndices.isEmpty()) {
932+
HashMap<String, List<String>> nonWildcardIndicesMap = CrossClusterConfigUtils
933+
.separateClusterIndexes(nonWildcardClusterIndices, clusterService);
934+
clusterIndicesMap.putAll(nonWildcardIndicesMap);
935+
}
808936

937+
if (clusterIndicesMap.isEmpty()) {
938+
listener
939+
.onFailure(
940+
new ValidationException(
941+
"No indices found for categorical field validation.",
942+
ValidationIssueType.CATEGORY,
943+
configValidationAspect
944+
)
945+
);
946+
} else {
947+
validateCategoricalField(clusterIndicesMap.entrySet().iterator(), configId, indexingDryRun, listener);
948+
}
809949
}
810950

811951
protected void validateCategoricalField(

src/main/java/org/opensearch/timeseries/transport/BaseDeleteConfigTransportAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.opensearch.action.support.WriteRequest;
2929
import org.opensearch.ad.model.ADTask;
3030
import org.opensearch.cluster.service.ClusterService;
31+
import org.opensearch.common.inject.*;
3132
import org.opensearch.common.settings.Setting;
3233
import org.opensearch.common.settings.Settings;
3334
import org.opensearch.common.util.concurrent.ThreadContext;
@@ -73,6 +74,7 @@ public abstract class BaseDeleteConfigTransportAction<TaskCacheManagerType exten
7374
private final List<TaskTypeEnum> batchTaskTypes;
7475
protected final String configIndexName;
7576

77+
@Inject
7678
public BaseDeleteConfigTransportAction(
7779
TransportService transportService,
7880
ActionFilters actionFilters,

src/main/java/org/opensearch/timeseries/transport/BaseEntityProfileTransportAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.opensearch.action.support.HandledTransportAction;
2323
import org.opensearch.cluster.node.DiscoveryNode;
2424
import org.opensearch.cluster.service.ClusterService;
25+
import org.opensearch.common.inject.*;
2526
import org.opensearch.common.settings.Setting;
2627
import org.opensearch.common.settings.Settings;
2728
import org.opensearch.common.unit.TimeValue;
@@ -62,6 +63,7 @@ public class BaseEntityProfileTransportAction<RCFModelType extends ThresholdedRa
6263
private final CacheProviderType cacheProvider;
6364
private final String entityProfileAction;
6465

66+
@Inject
6567
public BaseEntityProfileTransportAction(
6668
ActionFilters actionFilters,
6769
TransportService transportService,

src/main/java/org/opensearch/timeseries/transport/BaseGetConfigTransportAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.opensearch.action.support.HandledTransportAction;
3636
import org.opensearch.cluster.service.ClusterService;
3737
import org.opensearch.common.CheckedConsumer;
38+
import org.opensearch.common.inject.*;
3839
import org.opensearch.common.settings.Setting;
3940
import org.opensearch.common.settings.Settings;
4041
import org.opensearch.common.util.concurrent.ThreadContext;
@@ -105,6 +106,7 @@ public abstract class BaseGetConfigTransportAction<GetConfigResponseType extends
105106
private final TaskProfileRunnerType taskProfileRunner;
106107
protected final String configIndexName;
107108

109+
@Inject
108110
public BaseGetConfigTransportAction(
109111
TransportService transportService,
110112
DiscoveryNodeFilterer nodeFilter,

src/main/java/org/opensearch/timeseries/transport/BaseJobTransportAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.action.support.ActionFilters;
1919
import org.opensearch.action.support.HandledTransportAction;
2020
import org.opensearch.cluster.service.ClusterService;
21+
import org.opensearch.common.inject.*;
2122
import org.opensearch.common.settings.Setting;
2223
import org.opensearch.common.settings.Settings;
2324
import org.opensearch.common.unit.TimeValue;
@@ -60,6 +61,7 @@ public abstract class BaseJobTransportAction<IndexType extends Enum<IndexType> &
6061
private final Clock clock;
6162
private final Class<ConfigType> configTypeClass;
6263

64+
@Inject
6365
public BaseJobTransportAction(
6466
TransportService transportService,
6567
ActionFilters actionFilters,

src/main/java/org/opensearch/timeseries/transport/BaseValidateConfigTransportAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public abstract class BaseValidateConfigTransportAction<IndexType extends Enum<I
5757
protected final ClusterService clusterService;
5858
protected final NamedXContentRegistry xContentRegistry;
5959
protected final IndexManagementType indexManagement;
60+
protected final TransportService transportService;
6061
protected final SearchFeatureDao searchFeatureDao;
6162
protected final NamedWriteableRegistry namedWriteableRegistry;
6263
protected volatile Boolean filterByEnabled;
@@ -90,6 +91,7 @@ public BaseValidateConfigTransportAction(
9091
this.namedWriteableRegistry = namedWriteableRegistry;
9192
this.filterByEnabled = filterByBackendRoleSetting.get(settings);
9293
clusterService.getClusterSettings().addSettingsUpdateConsumer(filterByBackendRoleSetting, it -> filterByEnabled = it);
94+
this.transportService = transportService;
9395
this.searchFeatureDao = searchFeatureDao;
9496
this.clock = Clock.systemUTC();
9597
this.settings = settings;

0 commit comments

Comments
 (0)