Skip to content

refactor: remove prom store write dispatch #5812

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 13 additions & 31 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use crate::error::{
ToJsonSnafu,
};
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
use crate::http::prom_store::PromStoreState;
use crate::http::prometheus::{
build_info_query, format_query, instant_query, label_values_query, labels_query, parse_query,
range_query, series_query,
Expand Down Expand Up @@ -554,10 +555,16 @@ impl HttpServerBuilder {
prom_store_with_metric_engine: bool,
is_strict_mode: bool,
) -> Self {
let state = PromStoreState {
prom_store_handler: handler,
prom_store_with_metric_engine,
is_strict_mode,
};

Self {
router: self.router.nest(
&format!("/{HTTP_API_VERSION}/prometheus"),
HttpServer::route_prom(handler, prom_store_with_metric_engine, is_strict_mode),
HttpServer::route_prom(state),
),
..self
}
Expand Down Expand Up @@ -1000,36 +1007,11 @@ impl HttpServer {
///
/// [read]: https://prometheus.io/docs/prometheus/latest/querying/remote_read_api/
/// [write]: https://prometheus.io/docs/concepts/remote_write_spec/
fn route_prom<S>(
prom_handler: PromStoreProtocolHandlerRef,
prom_store_with_metric_engine: bool,
is_strict_mode: bool,
) -> Router<S> {
let mut router = Router::new().route("/read", routing::post(prom_store::remote_read));
match (prom_store_with_metric_engine, is_strict_mode) {
(true, true) => {
router = router.route("/write", routing::post(prom_store::remote_write))
}
(true, false) => {
router = router.route(
"/write",
routing::post(prom_store::remote_write_without_strict_mode),
)
}
(false, true) => {
router = router.route(
"/write",
routing::post(prom_store::route_write_without_metric_engine),
)
}
(false, false) => {
router = router.route(
"/write",
routing::post(prom_store::route_write_without_metric_engine_and_strict_mode),
)
}
}
router.with_state(prom_handler)
fn route_prom<S>(state: PromStoreState) -> Router<S> {
Router::new()
.route("/read", routing::post(prom_store::remote_read))
.route("/write", routing::post(prom_store::remote_write))
.with_state(state)
}

fn route_influxdb<S>(influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router<S> {
Expand Down
88 changes: 14 additions & 74 deletions src/servers/src/http/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ pub const DEFAULT_ENCODING: &str = "snappy";
pub const VM_ENCODING: &str = "zstd";
pub const VM_PROTO_VERSION: &str = "1";

#[derive(Clone)]
pub struct PromStoreState {
pub prom_store_handler: PromStoreProtocolHandlerRef,
pub prom_store_with_metric_engine: bool,
pub is_strict_mode: bool,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct RemoteWriteQuery {
pub db: Option<String>,
Expand All @@ -69,99 +76,32 @@ impl Default for RemoteWriteQuery {
}
}

/// Same with [remote_write] but won't store data to metric engine.
#[axum_macros::debug_handler]
pub async fn route_write_without_metric_engine(
handler: State<PromStoreProtocolHandlerRef>,
query: Query<RemoteWriteQuery>,
extension: Extension<QueryContext>,
content_encoding: TypedHeader<headers::ContentEncoding>,
raw_body: Bytes,
) -> Result<impl IntoResponse> {
remote_write_impl(
handler,
query,
extension,
content_encoding,
raw_body,
true,
false,
)
.await
}

/// Same with [remote_write] but won't store data to metric engine.
/// And without strict_mode on will not check invalid UTF-8.
#[axum_macros::debug_handler]
pub async fn route_write_without_metric_engine_and_strict_mode(
handler: State<PromStoreProtocolHandlerRef>,
query: Query<RemoteWriteQuery>,
extension: Extension<QueryContext>,
content_encoding: TypedHeader<headers::ContentEncoding>,
raw_body: Bytes,
) -> Result<impl IntoResponse> {
remote_write_impl(
handler,
query,
extension,
content_encoding,
raw_body,
false,
false,
)
.await
}

#[axum_macros::debug_handler]
#[tracing::instrument(
skip_all,
fields(protocol = "prometheus", request_type = "remote_write")
)]
pub async fn remote_write(
handler: State<PromStoreProtocolHandlerRef>,
query: Query<RemoteWriteQuery>,
extension: Extension<QueryContext>,
content_encoding: TypedHeader<headers::ContentEncoding>,
raw_body: Bytes,
) -> Result<impl IntoResponse> {
remote_write_impl(
handler,
query,
extension,
content_encoding,
raw_body,
true,
true,
)
.await
}

#[axum_macros::debug_handler]
#[tracing::instrument(
skip_all,
fields(protocol = "prometheus", request_type = "remote_write")
)]
pub async fn remote_write_without_strict_mode(
handler: State<PromStoreProtocolHandlerRef>,
State(state): State<PromStoreState>,
query: Query<RemoteWriteQuery>,
extension: Extension<QueryContext>,
content_encoding: TypedHeader<headers::ContentEncoding>,
raw_body: Bytes,
) -> Result<impl IntoResponse> {
remote_write_impl(
handler,
state.prom_store_handler,
query,
extension,
content_encoding,
raw_body,
false,
true,
state.is_strict_mode,
state.prom_store_with_metric_engine,
)
.await
}

async fn remote_write_impl(
State(handler): State<PromStoreProtocolHandlerRef>,
handler: PromStoreProtocolHandlerRef,
Query(params): Query<RemoteWriteQuery>,
Extension(mut query_ctx): Extension<QueryContext>,
content_encoding: TypedHeader<headers::ContentEncoding>,
Expand Down Expand Up @@ -222,7 +162,7 @@ impl IntoResponse for PromStoreResponse {
fields(protocol = "prometheus", request_type = "remote_read")
)]
pub async fn remote_read(
State(handler): State<PromStoreProtocolHandlerRef>,
State(state): State<PromStoreState>,
Query(params): Query<RemoteWriteQuery>,
Extension(mut query_ctx): Extension<QueryContext>,
body: Bytes,
Expand All @@ -236,7 +176,7 @@ pub async fn remote_read(

let request = decode_remote_read_request(body).await?;

handler.read(request, query_ctx).await
state.prom_store_handler.read(request, query_ctx).await
}

fn try_decompress(is_zstd: bool, body: &[u8]) -> Result<Bytes> {
Expand Down
Loading