Skip to content

integ tests for exporter n reader #267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -37,11 +43,13 @@
import org.junit.Before;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand All @@ -51,6 +59,8 @@

public abstract class QueryInsightsRestTestCase extends OpenSearchRestTestCase {
protected static final String QUERY_INSIGHTS_INDICES_PREFIX = "top_queries";
private static final Logger logger = Logger.getLogger(QueryInsightsRestTestCase.class.getName());
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.ROOT).withZone(ZoneOffset.UTC);

protected boolean isHttps() {
return Optional.ofNullable(System.getProperty("https")).map("true"::equalsIgnoreCase).orElse(false);
Expand Down Expand Up @@ -212,7 +222,7 @@ protected String defaultTopQueryGroupingSettings() {

protected String createDocumentsBody() {
return "{\n"
+ " \"@timestamp\": \"2099-11-15T13:12:00\",\n"
+ " \"@timestamp\": \"2024-04-01T13:12:00\",\n"
+ " \"message\": \"this is document 1\",\n"
+ " \"user\": {\n"
+ " \"id\": \"cyji\"\n"
Expand Down Expand Up @@ -380,4 +390,303 @@ protected void updateClusterSettings(Supplier<String> settingsSupplier) throws I
Response response = client().performRequest(request);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
}

protected void createDocument() throws IOException {
String json = "{ \"title\": \"Test Document\", \"content\": \"This is a test document for OpenSearch\" }";
Request req = new Request("POST", "/my-index-0/_doc/");
req.setJsonEntity(json);
Response response = client().performRequest(req);
assertEquals(201, response.getStatusLine().getStatusCode());
}

protected void performSearch() throws IOException, InterruptedException {
Thread.sleep(5000);

String searchJson = "{ \"query\": { \"match\": { \"title\": \"Test Document\" } } }";
Request req = new Request("POST", "/my-index-0/_search?size=20");
req.setJsonEntity(searchJson);
Response response = client().performRequest(req);
assertEquals(200, response.getStatusLine().getStatusCode());
String content = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8);
assertTrue("Expected search result for title", content.contains("\"Test Document\""));
}

protected void setLatencyWindowSize(String size) throws IOException {
String json = "{ \"persistent\": { \"search.insights.top_queries.latency.window_size\": \"" + size + "\" } }";
Request req = new Request("PUT", "/_cluster/settings");
req.setJsonEntity(json);
client().performRequest(req);
}

protected void defaultExporterSettings() throws IOException {
Request request = new Request("PUT", "/_cluster/settings");
request.setJsonEntity(
"{ \"persistent\": { "
+ "\"search.insights.top_queries.exporter.type\": \"local_index\", "
+ "\"search.insights.top_queries.latency.enabled\": \"true\" } }"
);
Response response = client().performRequest(request);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
}

protected void cleanup() throws IOException, InterruptedException {
Thread.sleep(12000);

try {
client().performRequest(new Request("DELETE", "/top_queries"));
} catch (ResponseException e) {
logger.warning("Cleanup: Failed to delete /top_queries: " + e.getMessage());
}

try {
client().performRequest(new Request("DELETE", "/my-index-0"));
} catch (ResponseException e) {
logger.warning("Cleanup: Failed to delete /my-index-0: " + e.getMessage());
}

String resetSettings = "{ \"persistent\": { "
+ "\"search.insights.top_queries.exporter.type\": \"none\", "
+ "\"search.insights.top_queries.latency.enabled\": \"false\" } }";
Request resetReq = new Request("PUT", "/_cluster/settings");
resetReq.setJsonEntity(resetSettings);
client().performRequest(resetReq);
}

protected void cleanupIndextemplate() throws IOException, InterruptedException {
Thread.sleep(3000);

try {
client().performRequest(new Request("DELETE", "/_index_template"));
} catch (ResponseException e) {
logger.warning("Failed to delete /_index_template: " + e.getMessage());
}
}

protected void checkLocalIndices() throws IOException {
Request indicesRequest = new Request("GET", "/_cat/indices?v");
Response response = client().performRequest(indicesRequest);
assertEquals(200, response.getStatusLine().getStatusCode());

String responseContent = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8);
assertTrue("Expected top_queries-* index to be green", responseContent.contains("green"));

String suffix = null;
Pattern pattern = Pattern.compile("top_queries-(\\d{4}\\.\\d{2}\\.\\d{2}-\\d+)");
Matcher matcher = pattern.matcher(responseContent);
if (matcher.find()) {
suffix = matcher.group(1);
} else {
fail("Failed to extract top_queries index suffix");
}

assertNotNull("Failed to extract suffix from top_queries-* index", suffix);
String fullIndexName = "top_queries-" + suffix;
assertTrue("Expected top_queries-{" + fullIndexName + "} index to be present", responseContent.contains(fullIndexName));

Request fetchRequest = new Request("GET", "/" + fullIndexName + "/_search?size=10");
Response fetchResponse = client().performRequest(fetchRequest);
assertEquals(200, fetchResponse.getStatusLine().getStatusCode());

byte[] bytes = fetchResponse.getEntity().getContent().readAllBytes();

try (
XContentParser parser = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
bytes
)
) {
Map<String, Object> responseMap = parser.map();

Map<String, Object> hitsWrapper = (Map<String, Object>) responseMap.get("hits");
List<Map<String, Object>> hits = (List<Map<String, Object>>) hitsWrapper.get("hits");

Map<String, Object> firstHit = hits.get(0);
Map<String, Object> source = (Map<String, Object>) firstHit.get("_source");

assertEquals("query_then_fetch", source.get("search_type"));
assertEquals("NONE", source.get("group_by"));
assertEquals(1, ((Number) source.get("total_shards")).intValue());

Map<String, Object> queryBlock = (Map<String, Object>) source.get("query");

if (queryBlock != null && queryBlock.containsKey("match")) {
Map<String, Object> match = (Map<String, Object>) queryBlock.get("match");
if (match != null && match.containsKey("title")) {
Map<String, Object> title = (Map<String, Object>) match.get("title");
if (title != null) {
assertEquals("Test Document", title.get("query"));
}
}
}

Map<String, Object> measurements = (Map<String, Object>) source.get("measurements");
assertNotNull("Expected measurements", measurements);
assertTrue(measurements.containsKey("cpu"));
assertTrue(measurements.containsKey("latency"));
assertTrue(measurements.containsKey("memory"));

List<Map<String, Object>> taskResourceUsages = (List<Map<String, Object>>) source.get("task_resource_usages");
assertTrue("Expected non-empty task_resource_usages", taskResourceUsages.size() > 0);
}
}

protected void checkQueryInsightsIndexTemplate() throws IOException {
Request request = new Request("GET", "/_index_template?pretty");
Response response = client().performRequest(request);
byte[] bytes = response.getEntity().getContent().readAllBytes();

try (
XContentParser parser = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
bytes
)
) {
Map<String, Object> parsed = parser.map();

List<Map<String, Object>> templates = (List<Map<String, Object>>) parsed.get("index_templates");
assertNotNull("Expected index_templates to exist", templates);
assertFalse("Expected at least one index_template", templates.isEmpty());

Map<String, Object> firstTemplate = templates.get(0);
assertEquals("query_insights_top_queries_template", firstTemplate.get("name"));

Map<String, Object> indexTemplate = (Map<String, Object>) firstTemplate.get("index_template");

List<String> indexPatterns = (List<String>) indexTemplate.get("index_patterns");
assertTrue("Expected index_patterns to include top_queries-*", indexPatterns.contains("top_queries-*"));

Map<String, Object> template = (Map<String, Object>) indexTemplate.get("template");
Map<String, Object> settings = (Map<String, Object>) template.get("settings");
Map<String, Object> indexSettings = (Map<String, Object>) settings.get("index");
assertEquals("1", indexSettings.get("number_of_shards"));
assertEquals("0-2", indexSettings.get("auto_expand_replicas"));

Map<String, Object> mappings = (Map<String, Object>) template.get("mappings");
Map<String, Object> meta = (Map<String, Object>) mappings.get("_meta");
assertEquals(1, ((Number) meta.get("schema_version")).intValue());
assertEquals("top_n_queries", meta.get("query_insights_feature_space"));

Map<String, Object> properties = (Map<String, Object>) mappings.get("properties");
assertTrue("Expected 'total_shards' in mappings", properties.containsKey("total_shards"));
assertTrue("Expected 'search_type' in mappings", properties.containsKey("search_type"));
assertTrue("Expected 'task_resource_usages' in mappings", properties.containsKey("task_resource_usages"));
assertTrue("Expected 'measurements' in mappings", properties.containsKey("measurements"));
}
}

protected void setLocalIndexToDebug() throws IOException {
String debugExporterJson = "{ \"persistent\": { \"search.insights.top_queries.exporter.type\": \"debug\" } }";
Request debugExporterRequest = new Request("PUT", "/_cluster/settings");
debugExporterRequest.setJsonEntity(debugExporterJson);
client().performRequest(debugExporterRequest);
}

protected void disableLocalIndexExporter() throws IOException {
String disableExporterJson = "{ \"persistent\": { \"search.insights.top_queries.exporter.type\": \"none\" } }";
Request disableExporterRequest = new Request("PUT", "/_cluster/settings");
disableExporterRequest.setJsonEntity(disableExporterJson);
client().performRequest(disableExporterRequest);
}

protected String[] invalidExporterSettings() {
return new String[] {
"{ \"persistent\" : { \"search.insights.top_queries.exporter.type\" : invalid_type } }",
"{ \"persistent\" : { \"search.insights.top_queries.exporter.type\" : local_index, \"search.insights.top_queries.exporter.config.index\" : \"1a2b\" } }" };
}

protected List<String[]> fetchHistoricalTopQueries(String ID, String NODEID, String Type) throws IOException {
String to = formatter.format(Instant.now());
String from = formatter.format(Instant.now().minusSeconds(9600)); // Default 160 minutes
return fetchHistoricalTopQueries(from, to, ID, NODEID, Type);
}

protected List<String[]> fetchHistoricalTopQueries(String from, String to, String filterId, String filterNodeID, String type)
throws IOException {
String endpoint = "/_insights/top_queries?from=" + from + "&to=" + to;

if (filterId != null && !filterId.equals("null")) {
endpoint += "&id=" + filterId;
}
if (filterNodeID != null && !filterNodeID.equals("null")) {
endpoint += "&nodeId=" + filterNodeID;
}
if (type != null && !type.equals("null")) {
endpoint += "&type=" + type;
}

Request fetchRequest = new Request("GET", endpoint);
Response fetchResponse = client().performRequest(fetchRequest);

assertEquals(200, fetchResponse.getStatusLine().getStatusCode());
byte[] content = fetchResponse.getEntity().getContent().readAllBytes();

try (
XContentParser parser = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
content
)
) {
Map<String, Object> root = parser.map();
List<Map<String, Object>> topQueries = (List<Map<String, Object>>) root.get("top_queries");
assertNotNull("Expected 'top_queries' field", topQueries);
assertFalse("Expected at least one top query", topQueries.isEmpty());

boolean matchFound = false;
List<String[]> idNodePairs = new ArrayList<>();

for (Map<String, Object> query : topQueries) {
assertTrue(query.containsKey("timestamp"));
assertEquals("query_then_fetch", query.get("search_type"));
assertTrue(((List<?>) query.get("indices")).contains("my-index-0"));
String id = (String) query.get("id");
String nodeId = (String) query.get("node_id");

// Validate ID if provided
if (filterId != "null") {
assertEquals("Expected id to match filter", filterId, id);
}
if (filterNodeID != "null") {
assertEquals("Expected id to match filter", filterNodeID, nodeId);
}

idNodePairs.add(new String[] { id, nodeId });

Map<String, Object> source = (Map<String, Object>) query.get("source");
Map<String, Object> queryBlock = (Map<String, Object>) source.get("query");
Map<String, Object> match = (Map<String, Object>) queryBlock.get("match");
Map<String, Object> title = (Map<String, Object>) match.get("title");
if ("Test Document".equals(title.get("query"))) {
matchFound = true;
}

List<Map<String, Object>> taskUsages = (List<Map<String, Object>>) query.get("task_resource_usages");
assertFalse("task_resource_usages should not be empty", taskUsages.isEmpty());
for (Map<String, Object> task : taskUsages) {
assertTrue("Missing action", task.containsKey("action"));
Map<String, Object> usage = (Map<String, Object>) task.get("taskResourceUsage");
assertNotNull("Missing cpu_time_in_nanos", usage.get("cpu_time_in_nanos"));
assertNotNull("Missing memory_in_bytes", usage.get("memory_in_bytes"));
}

Map<String, Object> measurements = (Map<String, Object>) query.get("measurements");
assertNotNull("Expected measurements", measurements);
assertTrue(measurements.containsKey("cpu"));
assertTrue(measurements.containsKey("memory"));
assertTrue(measurements.containsKey("latency"));
}

assertTrue("Expected at least one query with title='Test Document'", matchFound);
return idNodePairs;

}

}

protected List<String[]> fetchHistoricalTopQueries(Instant from, Instant to, String ID, String NODEID, String Type) throws IOException {
return fetchHistoricalTopQueries(formatter.format(from), formatter.format(to), ID, NODEID, Type);
}

}
Loading
Loading