Skip to content

Commit f35a4e6

Browse files
shuiyisongzyy17
authored andcommitted
refactor: remove prom store write dispatch (GreptimeTeam#5812)
* refactor: remove prom store remote write dispatch pattern * chore: ref XIX-22
1 parent 69c5fd8 commit f35a4e6

File tree

2 files changed

+27
-105
lines changed

2 files changed

+27
-105
lines changed

src/servers/src/http.rs

+13-31
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use crate::error::{
5757
ToJsonSnafu,
5858
};
5959
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
60+
use crate::http::prom_store::PromStoreState;
6061
use crate::http::prometheus::{
6162
build_info_query, format_query, instant_query, label_values_query, labels_query, parse_query,
6263
range_query, series_query,
@@ -555,10 +556,16 @@ impl HttpServerBuilder {
555556
prom_store_with_metric_engine: bool,
556557
is_strict_mode: bool,
557558
) -> Self {
559+
let state = PromStoreState {
560+
prom_store_handler: handler,
561+
prom_store_with_metric_engine,
562+
is_strict_mode,
563+
};
564+
558565
Self {
559566
router: self.router.nest(
560567
&format!("/{HTTP_API_VERSION}/prometheus"),
561-
HttpServer::route_prom(handler, prom_store_with_metric_engine, is_strict_mode),
568+
HttpServer::route_prom(state),
562569
),
563570
..self
564571
}
@@ -1004,36 +1011,11 @@ impl HttpServer {
10041011
///
10051012
/// [read]: https://prometheus.io/docs/prometheus/latest/querying/remote_read_api/
10061013
/// [write]: https://prometheus.io/docs/concepts/remote_write_spec/
1007-
fn route_prom<S>(
1008-
prom_handler: PromStoreProtocolHandlerRef,
1009-
prom_store_with_metric_engine: bool,
1010-
is_strict_mode: bool,
1011-
) -> Router<S> {
1012-
let mut router = Router::new().route("/read", routing::post(prom_store::remote_read));
1013-
match (prom_store_with_metric_engine, is_strict_mode) {
1014-
(true, true) => {
1015-
router = router.route("/write", routing::post(prom_store::remote_write))
1016-
}
1017-
(true, false) => {
1018-
router = router.route(
1019-
"/write",
1020-
routing::post(prom_store::remote_write_without_strict_mode),
1021-
)
1022-
}
1023-
(false, true) => {
1024-
router = router.route(
1025-
"/write",
1026-
routing::post(prom_store::route_write_without_metric_engine),
1027-
)
1028-
}
1029-
(false, false) => {
1030-
router = router.route(
1031-
"/write",
1032-
routing::post(prom_store::route_write_without_metric_engine_and_strict_mode),
1033-
)
1034-
}
1035-
}
1036-
router.with_state(prom_handler)
1014+
fn route_prom<S>(state: PromStoreState) -> Router<S> {
1015+
Router::new()
1016+
.route("/read", routing::post(prom_store::remote_read))
1017+
.route("/write", routing::post(prom_store::remote_write))
1018+
.with_state(state)
10371019
}
10381020

10391021
fn route_influxdb<S>(influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router<S> {

src/servers/src/http/prom_store.rs

+14-74
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ pub const DEFAULT_ENCODING: &str = "snappy";
4949
pub const VM_ENCODING: &str = "zstd";
5050
pub const VM_PROTO_VERSION: &str = "1";
5151

52+
#[derive(Clone)]
53+
pub struct PromStoreState {
54+
pub prom_store_handler: PromStoreProtocolHandlerRef,
55+
pub prom_store_with_metric_engine: bool,
56+
pub is_strict_mode: bool,
57+
}
58+
5259
#[derive(Debug, Serialize, Deserialize)]
5360
pub struct RemoteWriteQuery {
5461
pub db: Option<String>,
@@ -69,99 +76,32 @@ impl Default for RemoteWriteQuery {
6976
}
7077
}
7178

72-
/// Same with [remote_write] but won't store data to metric engine.
73-
#[axum_macros::debug_handler]
74-
pub async fn route_write_without_metric_engine(
75-
handler: State<PromStoreProtocolHandlerRef>,
76-
query: Query<RemoteWriteQuery>,
77-
extension: Extension<QueryContext>,
78-
content_encoding: TypedHeader<headers::ContentEncoding>,
79-
raw_body: Bytes,
80-
) -> Result<impl IntoResponse> {
81-
remote_write_impl(
82-
handler,
83-
query,
84-
extension,
85-
content_encoding,
86-
raw_body,
87-
true,
88-
false,
89-
)
90-
.await
91-
}
92-
93-
/// Same with [remote_write] but won't store data to metric engine.
94-
/// And without strict_mode on will not check invalid UTF-8.
95-
#[axum_macros::debug_handler]
96-
pub async fn route_write_without_metric_engine_and_strict_mode(
97-
handler: State<PromStoreProtocolHandlerRef>,
98-
query: Query<RemoteWriteQuery>,
99-
extension: Extension<QueryContext>,
100-
content_encoding: TypedHeader<headers::ContentEncoding>,
101-
raw_body: Bytes,
102-
) -> Result<impl IntoResponse> {
103-
remote_write_impl(
104-
handler,
105-
query,
106-
extension,
107-
content_encoding,
108-
raw_body,
109-
false,
110-
false,
111-
)
112-
.await
113-
}
114-
11579
#[axum_macros::debug_handler]
11680
#[tracing::instrument(
11781
skip_all,
11882
fields(protocol = "prometheus", request_type = "remote_write")
11983
)]
12084
pub async fn remote_write(
121-
handler: State<PromStoreProtocolHandlerRef>,
122-
query: Query<RemoteWriteQuery>,
123-
extension: Extension<QueryContext>,
124-
content_encoding: TypedHeader<headers::ContentEncoding>,
125-
raw_body: Bytes,
126-
) -> Result<impl IntoResponse> {
127-
remote_write_impl(
128-
handler,
129-
query,
130-
extension,
131-
content_encoding,
132-
raw_body,
133-
true,
134-
true,
135-
)
136-
.await
137-
}
138-
139-
#[axum_macros::debug_handler]
140-
#[tracing::instrument(
141-
skip_all,
142-
fields(protocol = "prometheus", request_type = "remote_write")
143-
)]
144-
pub async fn remote_write_without_strict_mode(
145-
handler: State<PromStoreProtocolHandlerRef>,
85+
State(state): State<PromStoreState>,
14686
query: Query<RemoteWriteQuery>,
14787
extension: Extension<QueryContext>,
14888
content_encoding: TypedHeader<headers::ContentEncoding>,
14989
raw_body: Bytes,
15090
) -> Result<impl IntoResponse> {
15191
remote_write_impl(
152-
handler,
92+
state.prom_store_handler,
15393
query,
15494
extension,
15595
content_encoding,
15696
raw_body,
157-
false,
158-
true,
97+
state.is_strict_mode,
98+
state.prom_store_with_metric_engine,
15999
)
160100
.await
161101
}
162102

163103
async fn remote_write_impl(
164-
State(handler): State<PromStoreProtocolHandlerRef>,
104+
handler: PromStoreProtocolHandlerRef,
165105
Query(params): Query<RemoteWriteQuery>,
166106
Extension(mut query_ctx): Extension<QueryContext>,
167107
content_encoding: TypedHeader<headers::ContentEncoding>,
@@ -222,7 +162,7 @@ impl IntoResponse for PromStoreResponse {
222162
fields(protocol = "prometheus", request_type = "remote_read")
223163
)]
224164
pub async fn remote_read(
225-
State(handler): State<PromStoreProtocolHandlerRef>,
165+
State(state): State<PromStoreState>,
226166
Query(params): Query<RemoteWriteQuery>,
227167
Extension(mut query_ctx): Extension<QueryContext>,
228168
body: Bytes,
@@ -236,7 +176,7 @@ pub async fn remote_read(
236176

237177
let request = decode_remote_read_request(body).await?;
238178

239-
handler.read(request, query_ctx).await
179+
state.prom_store_handler.read(request, query_ctx).await
240180
}
241181

242182
fn try_decompress(is_zstd: bool, body: &[u8]) -> Result<Bytes> {

0 commit comments

Comments
 (0)