Skip to content

Commit b6b371b

Browse files
committed
refactor(http/prom): request::NewRecordBodyData is metrics agnostic
this commit makes a mechanical refactor in the interest of paving ground towards using the request body metrics middleware in the inbound proxy. because of minor differences in how we represent protocols like gRPC in the inbound and outbound proxies, we must account for: (a): distinct `NewService<T>`'s accepting `Grpc<T>` and `Http<T>` targets, respectively, each holding a single metrics `Family<L>`. (b): a single `NewService<T>` that matches on the `Permitted<T>` enum to select one of two metrics `Family<L>`s. see #4119 for more background on this `Permitted<T>` enum. so, we should make the `NewRecordBodyData` agnostic to its metrics, rather than holding a concrete `RequestBodyFamilies<L>`. this commit hoists the metrics out of the middleware, and into the `X` and `ReqX` extractors used in the `NewService<T>` and `Service<T>` layers, respectively. bear particular attention to this part of the diff: ```diff - ReqX: ExtractParam<L, Request<ReqB>>, - L: linkerd_metrics::prom::encoding::EncodeLabelSet - + std::fmt::Debug - + std::hash::Hash - + Eq - + Clone - + Send - + Sync - + 'static, + ReqX: ExtractParam<BodyDataMetrics, Request<ReqB>>, ``` in other words, rather than using `X` and subsequently `ReqX` to extract our labels `L`, we expect these extractors to yield a `BodyDataMetrics`: one time series that should be incremented accordingly by the instrumented request. in even simpler words: `ReqX` now calls `familes.metrics(&labels)` too. Signed-off-by: katelyn martin <[email protected]>
1 parent 314c93c commit b6b371b

File tree

2 files changed

+53
-40
lines changed

2 files changed

+53
-40
lines changed

linkerd/app/outbound/src/http/logical/policy/route/metrics.rs

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use linkerd_app_core::{
55
svc,
66
};
77
use linkerd_http_prom::{
8-
body_data::request::{NewRecordBodyData, RequestBodyFamilies},
8+
body_data::request::{BodyDataMetrics, NewRecordBodyData, RequestBodyFamilies},
99
record_response::{self, StreamLabel},
1010
};
1111

@@ -61,24 +61,36 @@ pub type NewRecordDuration<T, M, N> =
6161
#[derive(Clone, Debug)]
6262
pub struct ExtractRecordDurationParams<M>(pub M);
6363

64+
/// Extracts a [`ExtractBodyDataParams`] from a target.
65+
#[derive(Clone, Debug)]
66+
pub struct ExtractBodyDataParams(RequestBodyFamilies<labels::Route>);
67+
68+
/// Extracts a single time series from the request body metrics.
69+
#[derive(Clone, Debug)]
70+
pub struct ExtractBodyDataMetrics<X> {
71+
/// The labeled family of metrics.
72+
metrics: RequestBodyFamilies<labels::Route>,
73+
/// Extracts [`labels::Route`] to select a time series.
74+
extract: X,
75+
}
76+
6477
pub fn layer<T, N>(
6578
metrics: &RequestMetrics<T::StreamLabel>,
6679
body_data: &RequestBodyFamilies<labels::Route>,
6780
) -> impl svc::Layer<
6881
N,
6982
Service = NewRecordBodyData<
7083
NewRecordDuration<T, RequestMetrics<T::StreamLabel>, N>,
71-
(),
72-
T,
73-
labels::Route,
84+
ExtractBodyDataParams,
85+
ExtractBodyDataMetrics<T>,
7486
>,
7587
>
7688
where
7789
T: Clone + MkStreamLabel,
7890
T: svc::ExtractParam<labels::Route, http::Request<http::BoxBody>>,
7991
{
8092
let record = NewRecordDuration::layer_via(ExtractRecordDurationParams(metrics.clone()));
81-
let body_data = NewRecordBodyData::new((), body_data.clone());
93+
let body_data = NewRecordBodyData::new(ExtractBodyDataParams(body_data.clone()));
8294

8395
svc::layer::mk(move |inner| {
8496
use svc::Layer;
@@ -172,6 +184,31 @@ where
172184
}
173185
}
174186

187+
// === impl ExtractRecordBodyDataParams ===
188+
189+
impl<T: Clone> svc::ExtractParam<ExtractBodyDataMetrics<T>, T> for ExtractBodyDataParams {
190+
fn extract_param(&self, t: &T) -> ExtractBodyDataMetrics<T> {
191+
let Self(metrics) = self;
192+
ExtractBodyDataMetrics {
193+
metrics: metrics.clone(),
194+
extract: t.clone(),
195+
}
196+
}
197+
}
198+
199+
// === impl ExtractRecordBodyDataMetrics ===
200+
201+
impl<B, X> svc::ExtractParam<BodyDataMetrics, http::Request<B>> for ExtractBodyDataMetrics<X>
202+
where
203+
X: svc::ExtractParam<labels::Route, http::Request<B>>,
204+
{
205+
fn extract_param(&self, req: &http::Request<B>) -> BodyDataMetrics {
206+
let Self { metrics, extract } = self;
207+
let labels = extract.extract_param(req);
208+
metrics.metrics(&labels)
209+
}
210+
}
211+
175212
// === impl LabelHttpRsp ===
176213

177214
impl<P> From<P> for LabelHttpRsp<P> {

linkerd/http/prom/src/body_data/request.rs

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,90 +10,71 @@ use std::marker::PhantomData;
1010

1111
/// A [`NewService<T>`] that creates [`RecordBodyData`] services.
1212
#[derive(Clone, Debug)]
13-
pub struct NewRecordBodyData<N, X, ReqX, L> {
13+
pub struct NewRecordBodyData<N, X, ReqX> {
1414
/// The inner [`NewService<T>`].
1515
inner: N,
1616
extract: X,
17-
metrics: RequestBodyFamilies<L>,
1817
marker: PhantomData<ReqX>,
1918
}
2019

2120
/// Tracks body frames for an inner `S`-typed [`Service`].
2221
#[derive(Clone, Debug)]
23-
pub struct RecordBodyData<S, ReqX, L> {
22+
pub struct RecordBodyData<S, ReqX> {
2423
/// The inner [`Service<T>`].
2524
inner: S,
2625
extract: ReqX,
27-
metrics: RequestBodyFamilies<L>,
2826
}
2927

3028
// === impl NewRecordBodyData ===
3129

32-
impl<N, X, ReqX, L> NewRecordBodyData<N, X, ReqX, L>
30+
impl<N, X, ReqX> NewRecordBodyData<N, X, ReqX>
3331
where
3432
X: Clone,
35-
L: Clone,
3633
{
3734
/// Returns a [`Layer<S>`] that tracks body chunks.
3835
///
3936
/// This uses an `X`-typed [`ExtractParam<P, T>`] implementation to extract service parameters
4037
/// from a `T`-typed target.
41-
pub fn new(extract: X, metrics: RequestBodyFamilies<L>) -> impl Layer<N, Service = Self> {
38+
pub fn new(extract: X) -> impl Layer<N, Service = Self> {
4239
svc::layer::mk(move |inner| Self {
4340
inner,
4441
extract: extract.clone(),
45-
metrics: metrics.clone(),
4642
marker: PhantomData,
4743
})
4844
}
4945
}
5046

51-
impl<T, N, X, ReqX, L> NewService<T> for NewRecordBodyData<N, X, ReqX, L>
47+
impl<T, N, X, ReqX> NewService<T> for NewRecordBodyData<N, X, ReqX>
5248
where
5349
N: NewService<T>,
5450
X: ExtractParam<ReqX, T>,
55-
L: Clone,
5651
{
57-
type Service = RecordBodyData<N::Service, ReqX, L>;
52+
type Service = RecordBodyData<N::Service, ReqX>;
5853

5954
fn new_service(&self, target: T) -> Self::Service {
6055
let Self {
6156
inner,
6257
extract,
63-
metrics,
6458
marker: _,
6559
} = self;
6660

6761
let extract = extract.extract_param(&target);
6862
let inner = inner.new_service(target);
69-
let metrics = metrics.clone();
7063

71-
RecordBodyData {
72-
inner,
73-
extract,
74-
metrics,
75-
}
64+
RecordBodyData { inner, extract }
7665
}
7766
}
7867

7968
// === impl RecordBodyData ===
8069

81-
impl<ReqB, RespB, S, ReqX, L> Service<Request<ReqB>> for RecordBodyData<S, ReqX, L>
70+
impl<ReqB, RespB, S, ReqX> Service<Request<ReqB>> for RecordBodyData<S, ReqX>
8271
where
8372
S: Service<Request<BoxBody>, Response = Response<RespB>>,
8473
S::Future: Send + 'static,
8574
ReqB: http_body::Body + Send + 'static,
8675
ReqB::Data: Send + 'static,
8776
ReqB::Error: Into<Error>,
88-
ReqX: ExtractParam<L, Request<ReqB>>,
89-
L: linkerd_metrics::prom::encoding::EncodeLabelSet
90-
+ std::fmt::Debug
91-
+ std::hash::Hash
92-
+ Eq
93-
+ Clone
94-
+ Send
95-
+ Sync
96-
+ 'static,
77+
ReqX: ExtractParam<BodyDataMetrics, Request<ReqB>>,
9778
{
9879
type Response = S::Response;
9980
type Error = S::Error;
@@ -108,15 +89,10 @@ where
10889
}
10990

11091
fn call(&mut self, req: Request<ReqB>) -> Self::Future {
111-
let Self {
112-
inner,
113-
extract,
114-
metrics,
115-
} = self;
92+
let Self { inner, extract } = self;
11693

11794
let req = {
118-
let labels = extract.extract_param(&req);
119-
let metrics = metrics.metrics(&labels);
95+
let metrics = extract.extract_param(&req);
12096
let instrument = |b| super::body::Body::new(b, metrics);
12197
req.map(instrument).map(BoxBody::new)
12298
};

0 commit comments

Comments
 (0)