@@ -10,7 +10,7 @@ use chrono::{DateTime, Utc};
10
10
use mas_storage:: { RepositoryAccess , RepositoryError , user:: BrowserSessionRepository } ;
11
11
use opentelemetry:: {
12
12
Key , KeyValue ,
13
- metrics:: { Counter , Histogram } ,
13
+ metrics:: { Counter , Gauge , Histogram } ,
14
14
} ;
15
15
use sqlx:: PgPool ;
16
16
use tokio_util:: sync:: CancellationToken ;
@@ -45,6 +45,7 @@ struct ActivityRecord {
45
45
pub struct Worker {
46
46
pool : PgPool ,
47
47
pending_records : HashMap < ( SessionKind , Ulid ) , ActivityRecord > ,
48
+ pending_records_gauge : Gauge < u64 > ,
48
49
message_counter : Counter < u64 > ,
49
50
flush_time_histogram : Histogram < u64 > ,
50
51
}
@@ -80,9 +81,17 @@ impl Worker {
80
81
. with_unit ( "ms" )
81
82
. build ( ) ;
82
83
84
+ let pending_records_gauge = METER
85
+ . u64_gauge ( "mas.activity_tracker.pending_records" )
86
+ . with_description ( "The number of pending activity records" )
87
+ . with_unit ( "{records}" )
88
+ . build ( ) ;
89
+ pending_records_gauge. record ( 0 , & [ ] ) ;
90
+
83
91
Self {
84
92
pool,
85
93
pending_records : HashMap :: with_capacity ( MAX_PENDING_RECORDS ) ,
94
+ pending_records_gauge,
86
95
message_counter,
87
96
flush_time_histogram,
88
97
}
@@ -165,6 +174,10 @@ impl Worker {
165
174
let _ = tx. send ( ( ) ) ;
166
175
}
167
176
}
177
+
178
+ // Update the gauge
179
+ self . pending_records_gauge
180
+ . record ( self . pending_records . len ( ) as u64 , & [ ] ) ;
168
181
}
169
182
170
183
// Flush one last time
0 commit comments