Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add direct-query-core module for prometheus integration #3440

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
/** Language type accepted in async query apis. */
public enum LangType {
SQL("sql"),
PPL("ppl");
PPL("ppl"),
PROMQL("promql");
private final String text;

LangType(String text) {
Expand Down
107 changes: 107 additions & 0 deletions direct-query-core/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java-library'
id "io.freefair.lombok"
id 'jacoco'
id 'java-test-fixtures'
}

repositories {
mavenCentral()
}

dependencies {
api project(':core')
implementation project(':protocol')
implementation project(':opensearch')
implementation project(':datasources')
implementation project(':async-query-core')

// Common dependencies
implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation group: 'org.json', name: 'json', version: '20231013'
implementation group: 'commons-io', name: 'commons-io', version: "${commons_io_version}"

// Test dependencies
testImplementation(platform("org.junit:junit-bom:5.9.3"))
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.3'
testImplementation group: 'org.mockito', name: 'mockito-core', version: "${mockito_version}"
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: "${mockito_version}"
testImplementation group: 'com.squareup.okhttp3', name: 'mockwebserver', version: '4.12.0'

testCompileOnly('junit:junit:4.13.1') {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
testRuntimeOnly("org.junit.vintage:junit-vintage-engine") {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine") {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
testImplementation("org.opensearch.test:framework:${opensearch_version}")
}

test {
useJUnitPlatform()
testLogging {
events "failed"
exceptionFormat "full"
}
}
task junit4(type: Test) {
useJUnitPlatform {
includeEngines("junit-vintage")
}
systemProperty 'tests.security.manager', 'false'
testLogging {
events "failed"
exceptionFormat "full"
}
}

jacocoTestReport {
dependsOn test, junit4
executionData test, junit4
reports {
html.required = true
xml.required = true
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}

jacocoTestCoverageVerification {
dependsOn test, junit4
executionData test, junit4
violationRules {
rule {
element = 'CLASS'
excludes = [
'org.opensearch.sql.prometheus.model.*',
'org.opensearch.sql.directquery.rest.model.*'
]
limit {
counter = 'LINE'
minimum = 1.0
}
limit {
counter = 'BRANCH'
minimum = 1.0
}
}
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}
check.dependsOn jacocoTestCoverageVerification
jacocoTestCoverageVerification.dependsOn jacocoTestReport
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource.client;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Objects;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.inject.Inject;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.client.exceptions.DataSourceClientException;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.prometheus.client.PrometheusClient;
import org.opensearch.sql.prometheus.client.PrometheusClientImpl;
import org.opensearch.sql.prometheus.utils.PrometheusClientUtils;

/** Factory for creating data source clients based on the data source type. */
public class DataSourceClientFactory {

public static final String URI = "prometheus.uri";

private static final Logger LOG = LogManager.getLogger();

private final Settings settings;
private final DataSourceService dataSourceService;

@Inject
public DataSourceClientFactory(DataSourceService dataSourceService, Settings settings) {
this.settings = settings;
this.dataSourceService = dataSourceService;
}

/**
* Creates a client for the specified data source with appropriate type.
*
* @param <T> The type of client to create
* @param dataSourceName The name of the data source
* @return The appropriate client for the data source type
* @throws DataSourceClientException If client creation fails
*/
@SuppressWarnings("unchecked")
public <T> T createClient(String dataSourceName) throws DataSourceClientException {
try {
if (!dataSourceService.dataSourceExists(dataSourceName)) {
throw new DataSourceClientException("Data source does not exist: " + dataSourceName);
}

DataSourceMetadata metadata = dataSourceService.getDataSourceMetadata(dataSourceName);
DataSourceType dataSourceType = metadata.getConnector();

return (T) createClientForType(dataSourceType.name(), metadata);
} catch (Exception e) {
if (e instanceof DataSourceClientException) {
throw e;
}
LOG.error("Failed to create client for data source: " + dataSourceName, e);
throw new DataSourceClientException(
"Failed to create client for data source: " + dataSourceName, e);
}
}

/**
* Gets the data source type for a given data source name.
*
* @param dataSourceName The name of the data source
* @return The type of the data source
* @throws DataSourceClientException If the data source doesn't exist
*/
public DataSourceType getDataSourceType(String dataSourceName) throws DataSourceClientException {
if (!dataSourceService.dataSourceExists(dataSourceName)) {
throw new DataSourceClientException("Data source does not exist: " + dataSourceName);
}

return dataSourceService.getDataSourceMetadata(dataSourceName).getConnector();
}

private Object createClientForType(String dataSourceType, DataSourceMetadata metadata)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be better to create a base client interface instead of Object and generics. Currently it's not straightforward to find all data source client implementations

throws DataSourceClientException {
switch (dataSourceType) {
case "PROMETHEUS":
return createPrometheusClient(metadata);
// Add cases for other data source types as needed
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commment

default:
throw new DataSourceClientException("Unsupported data source type: " + dataSourceType);
}
}

private PrometheusClient createPrometheusClient(DataSourceMetadata metadata) {
try {
// replace this with validate properties in PrometheusStorageFactory
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed?

String host = metadata.getProperties().get(URI);
if (Objects.isNull(host)) {
throw new DataSourceClientException("Host is required for Prometheus data source");
}

URI uri = new URI(host);
return new PrometheusClientImpl(
PrometheusClientUtils.getHttpClient(metadata.getProperties(), settings), uri);
} catch (URISyntaxException e) {
throw new DataSourceClientException("Invalid Prometheus URI", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource.client.exceptions;

/** Exception thrown when there are issues with data source client operations. */
public class DataSourceClientException extends RuntimeException {

public DataSourceClientException(String message) {
super(message);
}

public DataSourceClientException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource.query;

import java.io.IOException;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.directquery.rest.model.ExecuteDirectQueryRequest;
import org.opensearch.sql.directquery.rest.model.GetDirectQueryResourcesRequest;
import org.opensearch.sql.directquery.rest.model.GetDirectQueryResourcesResponse;

/**
* Interface for handling queries for specific data source types.
*
* @param <T> The client type this handler works with
*/
public interface QueryHandler<T> {

/**
* Returns the data source type this handler supports.
*
* @return The supported data source type
*/
DataSourceType getSupportedDataSourceType();

/**
* Executes a query for the supported data source type.
*
* @param client The client instance to use
* @param request The query request
* @return JSON string result of the query
* @throws IOException If query execution fails
*/
String executeQuery(T client, ExecuteDirectQueryRequest request) throws IOException;

/**
* Gets resources from the data source.
*
* @param client The client instance to use
* @param request The resources request
* @return Response containing the requested resources
* @throws IOException If resource retrieval fails
*/
GetDirectQueryResourcesResponse<?> getResources(T client, GetDirectQueryResourcesRequest request)
throws IOException;

/**
* Checks if this handler can handle the given client type.
*
* @param client The client to check
* @return true if this handler can handle the client
*/
boolean canHandle(T client);

/**
* Gets the client class this handler supports.
*
* @return The class of client this handler supports
*/
Class<T> getClientClass();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource.query;

import java.util.List;
import java.util.Optional;
import org.opensearch.common.inject.Inject;

/** Registry for all query handlers. */
public class QueryHandlerRegistry {

private final List<QueryHandler<?>> handlers;

@Inject
public QueryHandlerRegistry(List<QueryHandler<?>> handlers) {
this.handlers = handlers;
}

/**
* Finds a handler that can process the given client.
*
* @param client The client to find a handler for
* @param <T> The type of client
* @return An optional containing the handler if found
*/
@SuppressWarnings("unchecked")
public <T> Optional<QueryHandler<T>> getQueryHandler(T client) {
return handlers.stream()
.filter(
handler -> {
try {
// Get the handler's client class and check if it's compatible with our client
Class<?> handlerClientClass = handler.getClientClass();
return handlerClientClass.isInstance(client)
&& ((QueryHandler<T>) handler).canHandle(client);
} catch (ClassCastException e) {
return false;
}
})
.map(handler -> (QueryHandler<T>) handler)
.findFirst();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.directquery;

import org.opensearch.sql.directquery.rest.model.ExecuteDirectQueryRequest;
import org.opensearch.sql.directquery.rest.model.ExecuteDirectQueryResponse;
import org.opensearch.sql.directquery.rest.model.GetDirectQueryResourcesRequest;
import org.opensearch.sql.directquery.rest.model.GetDirectQueryResourcesResponse;

public interface DirectQueryExecutorService {

/**
* Execute a direct query request.
*
* @param request The direct query request
* @return A response containing the result
*/
ExecuteDirectQueryResponse executeDirectQuery(ExecuteDirectQueryRequest request);

/**
* Get resources from a data source.
*
* @param request The resources request
* @return A response containing the requested resources
*/
GetDirectQueryResourcesResponse<?> getDirectQueryResources(
GetDirectQueryResourcesRequest request);
}
Loading
Loading