Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.transport.ValidateConfigResponse;
import org.opensearch.timeseries.util.SecurityClientUtil;
import org.opensearch.transport.*;
import org.opensearch.transport.client.Client;

/**
Expand Down Expand Up @@ -57,6 +58,7 @@ public ValidateAnomalyDetectorActionHandler(
ClusterService clusterService,
Client client,
SecurityClientUtil clientUtil,
TransportService transportService,
ADIndexManagement anomalyDetectionIndices,
Config anomalyDetector,
TimeValue requestTimeout,
Expand All @@ -76,7 +78,7 @@ public ValidateAnomalyDetectorActionHandler(
clusterService,
client,
clientUtil,
null,
transportService,
anomalyDetectionIndices,
Config.NO_ID,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ protected Processor<ValidateConfigResponse> createProcessor(Config detector, Val
clusterService,
client,
clientUtil,
transportService,
indexManagement,
detector,
request.getRequestTimeout(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.transport.ValidateConfigResponse;
import org.opensearch.timeseries.util.SecurityClientUtil;
import org.opensearch.transport.*;
import org.opensearch.transport.client.Client;

/**
Expand Down Expand Up @@ -53,6 +54,7 @@ public class ValidateForecasterActionHandler extends AbstractForecasterActionHan

public ValidateForecasterActionHandler(
ClusterService clusterService,
TransportService transportService,
Client client,
SecurityClientUtil clientUtil,
ForecastIndexManagement forecastIndices,
Expand All @@ -74,7 +76,7 @@ public ValidateForecasterActionHandler(
clusterService,
client,
clientUtil,
null,
transportService,
forecastIndices,
Config.NO_ID,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public ValidateForecasterTransportAction(
protected Processor<ValidateConfigResponse> createProcessor(Config forecaster, ValidateConfigRequest request, User user) {
return new ValidateForecasterActionHandler(
clusterService,
transportService,
client,
clientUtil,
indexManagement,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
import org.opensearch.timeseries.util.ParseUtils;
import org.opensearch.timeseries.util.RestHandlerUtils;
import org.opensearch.timeseries.util.SecurityClientUtil;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.*;
import org.opensearch.transport.client.Client;

import com.google.common.collect.Sets;
Expand Down Expand Up @@ -156,6 +156,7 @@ public abstract class AbstractTimeSeriesActionHandler<T extends ActionResponse,
protected final ConfigUpdateConfirmer<IndexType, IndexManagementType, TaskCacheManagerType, TaskTypeEnum, TaskClass, TaskManagerType> handler;
protected final ClusterService clusterService;
protected final NamedXContentRegistry xContentRegistry;
protected final TransportService transportService;
protected final TimeValue requestTimeout;
protected final WriteRequest.RefreshPolicy refreshPolicy;
protected final Long seqNo;
Expand Down Expand Up @@ -217,6 +218,7 @@ public AbstractTimeSeriesActionHandler(
this.method = method;
this.clusterService = clusterService;
this.xContentRegistry = xContentRegistry;
this.transportService = transportService;
this.requestTimeout = requestTimeout;
this.refreshPolicy = refreshPolicy;
this.seqNo = seqNo;
Expand Down Expand Up @@ -321,13 +323,86 @@ protected void validateName(boolean indexingDryRun, ActionListener<T> listener)
listener.onFailure(createValidationException(AbstractTimeSeriesActionHandler.INVALID_NAME_SIZE, ValidationIssueType.NAME));
return;
}
validateTimeField(indexingDryRun, listener);
validateTimeFieldInAllClusters(indexingDryRun, listener);
}

protected void validateTimeField(boolean indexingDryRun, ActionListener<T> listener) {
protected void validateTimeFieldInAllClusters(boolean indexingDryRun, ActionListener<T> listener) {
List<String> wildcardClusterIndices = config.getIndices().stream().filter(idx -> idx.startsWith("*:")).toList();
List<String> nonWildcardClusterIndices = config.getIndices().stream().filter(idx -> !idx.startsWith("*:")).toList();

Map<String, List<String>> clusterIndicesMap = new HashMap<>();

// if no wildcard cluster indices found, fallback to typical validation
if (wildcardClusterIndices.isEmpty()) {
if (!nonWildcardClusterIndices.isEmpty()) {
HashMap<String, List<String>> nonWildcardMap = CrossClusterConfigUtils
.separateClusterIndexes(nonWildcardClusterIndices, clusterService);
clusterIndicesMap.putAll(nonWildcardMap);
validateTimeField(clusterIndicesMap, indexingDryRun, listener);
} else {
listener
.onFailure(
new ValidationException(
"No indices specified for time field validation.",
ValidationIssueType.TIMEFIELD_FIELD,
configValidationAspect
)
);
}
return;
}

// if there are wildcard cluster indices, check for remote clusters. throw exception if none configured
RemoteClusterService remoteClusterService = transportService.getRemoteClusterService();
Set<String> remoteClusters = remoteClusterService.getRegisteredRemoteClusterNames();

if (remoteClusters.isEmpty()) {
listener
.onFailure(
new ValidationException(
"Indices with wildcard cluster prefix specified, but no remote clusters are configured. Please configure remote clusters or remove the wildcard cluster prefix from the indices.",
ValidationIssueType.TIMEFIELD_FIELD,
configValidationAspect
)
);
return;
}

// For each remote cluster, add all indices matching the wildcard pattern
for (String remote : remoteClusters) {
List<String> remoteIndices = wildcardClusterIndices
.stream()
.map(idx -> idx.substring(2)) // remove "*:" prefix
.toList();

if (!remoteIndices.isEmpty()) {
clusterIndicesMap.put(remote, remoteIndices);
}
}

// add non-wildcard indices (local or explicit remote) to the map
if (!nonWildcardClusterIndices.isEmpty()) {
HashMap<String, List<String>> nonWildcardIndicesMap = CrossClusterConfigUtils
.separateClusterIndexes(nonWildcardClusterIndices, clusterService);
clusterIndicesMap.putAll(nonWildcardIndicesMap);
}

if (clusterIndicesMap.isEmpty()) {
listener
.onFailure(
new ValidationException(
"No indices found for time field validation.",
ValidationIssueType.TIMEFIELD_FIELD,
configValidationAspect
)
);
} else {
validateTimeField(clusterIndicesMap, indexingDryRun, listener);
}
}

protected void validateTimeField(Map<String, List<String>> clusterIndicesMap, boolean indexingDryRun, ActionListener<T> listener) {
String givenTimeField = config.getTimeField();
HashMap<String, List<String>> clusterIndicesMap = CrossClusterConfigUtils
.separateClusterIndexes(config.getIndices(), clusterService);

ActionListener<MergeableList<Optional<double[]>>> validateGetMappingForTimeFieldListener = ActionListener.wrap(response -> {
prepareConfigIndexing(indexingDryRun, listener);
Expand Down Expand Up @@ -732,7 +807,7 @@ protected void validateAgainstExistingHCConfig(String configId, boolean indexing
)
);
} else {
validateCategoricalFieldsInAllIndices(configId, indexingDryRun, listener);
validateCategoricalFieldsInAllClusters(configId, indexingDryRun, listener);
}

}
Expand Down Expand Up @@ -794,18 +869,83 @@ protected void onSearchHCConfigResponse(SearchResponse response, String detector
}
listener.onFailure(new IllegalArgumentException(errorMsg));
} else {
validateCategoricalFieldsInAllIndices(detectorId, indexingDryRun, listener);
validateCategoricalFieldsInAllClusters(detectorId, indexingDryRun, listener);
}
}

protected void validateCategoricalFieldsInAllIndices(String configId, boolean indexingDryRun, ActionListener<T> listener) {
HashMap<String, List<String>> clusterIndicesMap = CrossClusterConfigUtils
.separateClusterIndexes(config.getIndices(), clusterService);
protected void validateCategoricalFieldsInAllClusters(String configId, boolean indexingDryRun, ActionListener<T> listener) {
List<String> wildcardClusterIndices = config.getIndices().stream().filter(idx -> idx.startsWith("*:")).toList();
List<String> nonWildcardClusterIndices = config.getIndices().stream().filter(idx -> !idx.startsWith("*:")).toList();

HashMap<String, List<String>> clusterIndicesMap = new HashMap<>();

// if no wildcard cluster indices found, fallback to typical validation
if (wildcardClusterIndices.isEmpty()) {
if (!nonWildcardClusterIndices.isEmpty()) {
HashMap<String, List<String>> nonWildcardIndicesMap = CrossClusterConfigUtils
.separateClusterIndexes(nonWildcardClusterIndices, clusterService);
clusterIndicesMap.putAll(nonWildcardIndicesMap);
validateCategoricalField(clusterIndicesMap.entrySet().iterator(), configId, indexingDryRun, listener);
} else {
listener
.onFailure(
new ValidationException(
"No indices specified for categorical field validation.",
ValidationIssueType.CATEGORY,
configValidationAspect
)
);
}
return;
}

// if there are wildcard cluster indices, check for remote clusters. throw exception if none configured
RemoteClusterService remoteClusterService = transportService.getRemoteClusterService();
Set<String> remoteClusters = remoteClusterService.getRegisteredRemoteClusterNames();

if (remoteClusters.isEmpty()) {
listener
.onFailure(
new ValidationException(
"Indices with wildcard cluster prefix specified, but no remote clusters are configured. Please configure remote clusters or remove the wildcard cluster prefix from the indices.",
ValidationIssueType.CATEGORY,
configValidationAspect
)
);
return;
}

// for each remote cluster, add all indices with wildcard cluster prefix
for (String remote : remoteClusters) {
List<String> remoteIndices = wildcardClusterIndices
.stream()
.map(idx -> idx.substring(2)) // remove "*:" prefix
.toList();

Iterator<Map.Entry<String, List<String>>> iterator = clusterIndicesMap.entrySet().iterator();
if (!remoteIndices.isEmpty()) {
clusterIndicesMap.put(remote, remoteIndices);
}
}

validateCategoricalField(iterator, configId, indexingDryRun, listener);
// add non-wildcard cluster indices (local or explicit remote) to the map
if (!nonWildcardClusterIndices.isEmpty()) {
HashMap<String, List<String>> nonWildcardIndicesMap = CrossClusterConfigUtils
.separateClusterIndexes(nonWildcardClusterIndices, clusterService);
clusterIndicesMap.putAll(nonWildcardIndicesMap);
}

if (clusterIndicesMap.isEmpty()) {
listener
.onFailure(
new ValidationException(
"No indices found for categorical field validation.",
ValidationIssueType.CATEGORY,
configValidationAspect
)
);
} else {
validateCategoricalField(clusterIndicesMap.entrySet().iterator(), configId, indexingDryRun, listener);
}
}

protected void validateCategoricalField(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.action.support.WriteRequest;
import org.opensearch.ad.model.ADTask;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.*;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
Expand Down Expand Up @@ -73,6 +74,7 @@ public abstract class BaseDeleteConfigTransportAction<TaskCacheManagerType exten
private final List<TaskTypeEnum> batchTaskTypes;
protected final String configIndexName;

@Inject
public BaseDeleteConfigTransportAction(
TransportService transportService,
ActionFilters actionFilters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.*;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class BaseEntityProfileTransportAction<RCFModelType extends ThresholdedRa
private final CacheProviderType cacheProvider;
private final String entityProfileAction;

@Inject
public BaseEntityProfileTransportAction(
ActionFilters actionFilters,
TransportService transportService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.inject.*;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
Expand Down Expand Up @@ -105,6 +106,7 @@ public abstract class BaseGetConfigTransportAction<GetConfigResponseType extends
private final TaskProfileRunnerType taskProfileRunner;
protected final String configIndexName;

@Inject
public BaseGetConfigTransportAction(
TransportService transportService,
DiscoveryNodeFilterer nodeFilter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.*;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -60,6 +61,7 @@ public abstract class BaseJobTransportAction<IndexType extends Enum<IndexType> &
private final Clock clock;
private final Class<ConfigType> configTypeClass;

@Inject
public BaseJobTransportAction(
TransportService transportService,
ActionFilters actionFilters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public abstract class BaseValidateConfigTransportAction<IndexType extends Enum<I
protected final ClusterService clusterService;
protected final NamedXContentRegistry xContentRegistry;
protected final IndexManagementType indexManagement;
protected final TransportService transportService;
protected final SearchFeatureDao searchFeatureDao;
protected final NamedWriteableRegistry namedWriteableRegistry;
protected volatile Boolean filterByEnabled;
Expand Down Expand Up @@ -90,6 +91,7 @@ public BaseValidateConfigTransportAction(
this.namedWriteableRegistry = namedWriteableRegistry;
this.filterByEnabled = filterByBackendRoleSetting.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(filterByBackendRoleSetting, it -> filterByEnabled = it);
this.transportService = transportService;
this.searchFeatureDao = searchFeatureDao;
this.clock = Clock.systemUTC();
this.settings = settings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.*;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.NetworkExceptionHelper;
Expand Down Expand Up @@ -137,6 +138,7 @@ public abstract class ResultProcessor<TransportResultRequestType extends ResultR

protected boolean runOnce;

@Inject
public ResultProcessor(
Setting<TimeValue> requestTimeoutSetting,
String entityResultAction,
Expand Down
Loading
Loading