@@ -15,16 +15,6 @@ use super::{
15
15
precomputed_sum:: PrecomputedSum , sum:: Sum , Number ,
16
16
} ;
17
17
18
- pub ( crate ) const STREAM_CARDINALITY_LIMIT : usize = 2000 ;
19
-
20
- /// Checks whether aggregator has hit cardinality limit for metric streams
21
- pub ( crate ) fn is_under_cardinality_limit ( _size : usize ) -> bool {
22
- true
23
-
24
- // TODO: Implement this feature, after allowing the ability to customize the cardinality limit.
25
- // size < STREAM_CARDINALITY_LIMIT
26
- }
27
-
28
18
/// Receives measurements to be aggregated.
29
19
pub ( crate ) trait Measure < T > : Send + Sync + ' static {
30
20
fn call ( & self , measurement : T , attrs : & [ KeyValue ] ) ;
@@ -133,14 +123,22 @@ pub(crate) struct AggregateBuilder<T> {
133
123
/// measurements.
134
124
filter : AttributeSetFilter ,
135
125
126
+ /// Cardinality limit for the metric stream
127
+ cardinality_limit : usize ,
128
+
136
129
_marker : marker:: PhantomData < T > ,
137
130
}
138
131
139
132
impl < T : Number > AggregateBuilder < T > {
140
- pub ( crate ) fn new ( temporality : Temporality , filter : Option < Filter > ) -> Self {
133
+ pub ( crate ) fn new (
134
+ temporality : Temporality ,
135
+ filter : Option < Filter > ,
136
+ cardinality_limit : usize ,
137
+ ) -> Self {
141
138
AggregateBuilder {
142
139
temporality,
143
140
filter : AttributeSetFilter :: new ( filter) ,
141
+ cardinality_limit,
144
142
_marker : marker:: PhantomData ,
145
143
}
146
144
}
@@ -150,18 +148,31 @@ impl<T: Number> AggregateBuilder<T> {
150
148
LastValue :: new (
151
149
overwrite_temporality. unwrap_or ( self . temporality ) ,
152
150
self . filter . clone ( ) ,
151
+ self . cardinality_limit ,
153
152
)
154
153
. into ( )
155
154
}
156
155
157
156
/// Builds a precomputed sum aggregate function input and output.
158
157
pub ( crate ) fn precomputed_sum ( & self , monotonic : bool ) -> AggregateFns < T > {
159
- PrecomputedSum :: new ( self . temporality , self . filter . clone ( ) , monotonic) . into ( )
158
+ PrecomputedSum :: new (
159
+ self . temporality ,
160
+ self . filter . clone ( ) ,
161
+ monotonic,
162
+ self . cardinality_limit ,
163
+ )
164
+ . into ( )
160
165
}
161
166
162
167
/// Builds a sum aggregate function input and output.
163
168
pub ( crate ) fn sum ( & self , monotonic : bool ) -> AggregateFns < T > {
164
- Sum :: new ( self . temporality , self . filter . clone ( ) , monotonic) . into ( )
169
+ Sum :: new (
170
+ self . temporality ,
171
+ self . filter . clone ( ) ,
172
+ monotonic,
173
+ self . cardinality_limit ,
174
+ )
175
+ . into ( )
165
176
}
166
177
167
178
/// Builds a histogram aggregate function input and output.
@@ -177,6 +188,7 @@ impl<T: Number> AggregateBuilder<T> {
177
188
boundaries,
178
189
record_min_max,
179
190
record_sum,
191
+ self . cardinality_limit ,
180
192
)
181
193
. into ( )
182
194
}
@@ -196,6 +208,7 @@ impl<T: Number> AggregateBuilder<T> {
196
208
max_scale,
197
209
record_min_max,
198
210
record_sum,
211
+ self . cardinality_limit ,
199
212
)
200
213
. into ( )
201
214
}
@@ -211,10 +224,13 @@ mod tests {
211
224
212
225
use super :: * ;
213
226
227
+ const CARDINALITY_LIMIT_DEFAULT : usize = 2000 ;
228
+
214
229
#[ test]
215
230
fn last_value_aggregation ( ) {
216
231
let AggregateFns { measure, collect } =
217
- AggregateBuilder :: < u64 > :: new ( Temporality :: Cumulative , None ) . last_value ( None ) ;
232
+ AggregateBuilder :: < u64 > :: new ( Temporality :: Cumulative , None , CARDINALITY_LIMIT_DEFAULT )
233
+ . last_value ( None ) ;
218
234
let mut a = MetricData :: Gauge ( Gauge {
219
235
data_points : vec ! [ GaugeDataPoint {
220
236
attributes: vec![ KeyValue :: new( "a" , 1 ) ] ,
@@ -244,7 +260,8 @@ mod tests {
244
260
fn precomputed_sum_aggregation ( ) {
245
261
for temporality in [ Temporality :: Delta , Temporality :: Cumulative ] {
246
262
let AggregateFns { measure, collect } =
247
- AggregateBuilder :: < u64 > :: new ( temporality, None ) . precomputed_sum ( true ) ;
263
+ AggregateBuilder :: < u64 > :: new ( temporality, None , CARDINALITY_LIMIT_DEFAULT )
264
+ . precomputed_sum ( true ) ;
248
265
let mut a = MetricData :: Sum ( Sum {
249
266
data_points : vec ! [
250
267
SumDataPoint {
@@ -290,7 +307,8 @@ mod tests {
290
307
fn sum_aggregation ( ) {
291
308
for temporality in [ Temporality :: Delta , Temporality :: Cumulative ] {
292
309
let AggregateFns { measure, collect } =
293
- AggregateBuilder :: < u64 > :: new ( temporality, None ) . sum ( true ) ;
310
+ AggregateBuilder :: < u64 > :: new ( temporality, None , CARDINALITY_LIMIT_DEFAULT )
311
+ . sum ( true ) ;
294
312
let mut a = MetricData :: Sum ( Sum {
295
313
data_points : vec ! [
296
314
SumDataPoint {
@@ -335,8 +353,9 @@ mod tests {
335
353
#[ test]
336
354
fn explicit_bucket_histogram_aggregation ( ) {
337
355
for temporality in [ Temporality :: Delta , Temporality :: Cumulative ] {
338
- let AggregateFns { measure, collect } = AggregateBuilder :: < u64 > :: new ( temporality, None )
339
- . explicit_bucket_histogram ( vec ! [ 1.0 ] , true , true ) ;
356
+ let AggregateFns { measure, collect } =
357
+ AggregateBuilder :: < u64 > :: new ( temporality, None , CARDINALITY_LIMIT_DEFAULT )
358
+ . explicit_bucket_histogram ( vec ! [ 1.0 ] , true , true ) ;
340
359
let mut a = MetricData :: Histogram ( Histogram {
341
360
data_points : vec ! [ HistogramDataPoint {
342
361
attributes: vec![ KeyValue :: new( "a1" , 1 ) ] ,
@@ -382,8 +401,9 @@ mod tests {
382
401
#[ test]
383
402
fn exponential_histogram_aggregation ( ) {
384
403
for temporality in [ Temporality :: Delta , Temporality :: Cumulative ] {
385
- let AggregateFns { measure, collect } = AggregateBuilder :: < u64 > :: new ( temporality, None )
386
- . exponential_bucket_histogram ( 4 , 20 , true , true ) ;
404
+ let AggregateFns { measure, collect } =
405
+ AggregateBuilder :: < u64 > :: new ( temporality, None , CARDINALITY_LIMIT_DEFAULT )
406
+ . exponential_bucket_histogram ( 4 , 20 , true , true ) ;
387
407
let mut a = MetricData :: ExponentialHistogram ( ExponentialHistogram {
388
408
data_points : vec ! [ ExponentialHistogramDataPoint {
389
409
attributes: vec![ KeyValue :: new( "a1" , 1 ) ] ,
0 commit comments