diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 2251f75eca6f..0a9fc6320006 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -57,6 +57,7 @@ use crate::error::{ ToJsonSnafu, }; use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2}; +use crate::http::prom_store::PromStoreState; use crate::http::prometheus::{ build_info_query, format_query, instant_query, label_values_query, labels_query, parse_query, range_query, series_query, @@ -554,10 +555,16 @@ impl HttpServerBuilder { prom_store_with_metric_engine: bool, is_strict_mode: bool, ) -> Self { + let state = PromStoreState { + prom_store_handler: handler, + prom_store_with_metric_engine, + is_strict_mode, + }; + Self { router: self.router.nest( &format!("/{HTTP_API_VERSION}/prometheus"), - HttpServer::route_prom(handler, prom_store_with_metric_engine, is_strict_mode), + HttpServer::route_prom(state), ), ..self } @@ -1000,36 +1007,11 @@ impl HttpServer { /// /// [read]: https://prometheus.io/docs/prometheus/latest/querying/remote_read_api/ /// [write]: https://prometheus.io/docs/concepts/remote_write_spec/ - fn route_prom( - prom_handler: PromStoreProtocolHandlerRef, - prom_store_with_metric_engine: bool, - is_strict_mode: bool, - ) -> Router { - let mut router = Router::new().route("/read", routing::post(prom_store::remote_read)); - match (prom_store_with_metric_engine, is_strict_mode) { - (true, true) => { - router = router.route("/write", routing::post(prom_store::remote_write)) - } - (true, false) => { - router = router.route( - "/write", - routing::post(prom_store::remote_write_without_strict_mode), - ) - } - (false, true) => { - router = router.route( - "/write", - routing::post(prom_store::route_write_without_metric_engine), - ) - } - (false, false) => { - router = router.route( - "/write", - routing::post(prom_store::route_write_without_metric_engine_and_strict_mode), - ) - } - } - router.with_state(prom_handler) + fn route_prom(state: PromStoreState) -> Router { + Router::new() + .route("/read", routing::post(prom_store::remote_read)) + .route("/write", routing::post(prom_store::remote_write)) + .with_state(state) } fn route_influxdb(influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router { diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index 581afcd57a31..82a0b9324368 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -49,6 +49,13 @@ pub const DEFAULT_ENCODING: &str = "snappy"; pub const VM_ENCODING: &str = "zstd"; pub const VM_PROTO_VERSION: &str = "1"; +#[derive(Clone)] +pub struct PromStoreState { + pub prom_store_handler: PromStoreProtocolHandlerRef, + pub prom_store_with_metric_engine: bool, + pub is_strict_mode: bool, +} + #[derive(Debug, Serialize, Deserialize)] pub struct RemoteWriteQuery { pub db: Option, @@ -69,99 +76,32 @@ impl Default for RemoteWriteQuery { } } -/// Same with [remote_write] but won't store data to metric engine. -#[axum_macros::debug_handler] -pub async fn route_write_without_metric_engine( - handler: State, - query: Query, - extension: Extension, - content_encoding: TypedHeader, - raw_body: Bytes, -) -> Result { - remote_write_impl( - handler, - query, - extension, - content_encoding, - raw_body, - true, - false, - ) - .await -} - -/// Same with [remote_write] but won't store data to metric engine. -/// And without strict_mode on will not check invalid UTF-8. -#[axum_macros::debug_handler] -pub async fn route_write_without_metric_engine_and_strict_mode( - handler: State, - query: Query, - extension: Extension, - content_encoding: TypedHeader, - raw_body: Bytes, -) -> Result { - remote_write_impl( - handler, - query, - extension, - content_encoding, - raw_body, - false, - false, - ) - .await -} - #[axum_macros::debug_handler] #[tracing::instrument( skip_all, fields(protocol = "prometheus", request_type = "remote_write") )] pub async fn remote_write( - handler: State, - query: Query, - extension: Extension, - content_encoding: TypedHeader, - raw_body: Bytes, -) -> Result { - remote_write_impl( - handler, - query, - extension, - content_encoding, - raw_body, - true, - true, - ) - .await -} - -#[axum_macros::debug_handler] -#[tracing::instrument( - skip_all, - fields(protocol = "prometheus", request_type = "remote_write") -)] -pub async fn remote_write_without_strict_mode( - handler: State, + State(state): State, query: Query, extension: Extension, content_encoding: TypedHeader, raw_body: Bytes, ) -> Result { remote_write_impl( - handler, + state.prom_store_handler, query, extension, content_encoding, raw_body, - false, - true, + state.is_strict_mode, + state.prom_store_with_metric_engine, ) .await } async fn remote_write_impl( - State(handler): State, + handler: PromStoreProtocolHandlerRef, Query(params): Query, Extension(mut query_ctx): Extension, content_encoding: TypedHeader, @@ -222,7 +162,7 @@ impl IntoResponse for PromStoreResponse { fields(protocol = "prometheus", request_type = "remote_read") )] pub async fn remote_read( - State(handler): State, + State(state): State, Query(params): Query, Extension(mut query_ctx): Extension, body: Bytes, @@ -236,7 +176,7 @@ pub async fn remote_read( let request = decode_remote_read_request(body).await?; - handler.read(request, query_ctx).await + state.prom_store_handler.read(request, query_ctx).await } fn try_decompress(is_zstd: bool, body: &[u8]) -> Result {