1515 */
1616package io .micrometer .prometheus .rsocket ;
1717
18- import java .io .IOException ;
19- import java .io .UncheckedIOException ;
20- import java .nio .channels .ClosedChannelException ;
21- import java .security .KeyFactory ;
22- import java .security .KeyPair ;
23- import java .security .KeyPairGenerator ;
24- import java .security .NoSuchAlgorithmException ;
25- import java .security .PrivateKey ;
26- import java .security .spec .PKCS8EncodedKeySpec ;
27- import java .util .concurrent .atomic .AtomicReference ;
28- import java .util .stream .Collectors ;
29-
30- import javax .annotation .PostConstruct ;
31- import javax .crypto .Cipher ;
32- import javax .crypto .SecretKey ;
33- import javax .crypto .spec .SecretKeySpec ;
34-
3518import io .micrometer .core .instrument .Counter ;
3619import io .micrometer .core .instrument .DistributionSummary ;
20+ import io .micrometer .core .instrument .Tags ;
3721import io .micrometer .core .instrument .Timer ;
3822import io .micrometer .prometheus .PrometheusMeterRegistry ;
3923import io .netty .buffer .ByteBuf ;
4731import io .rsocket .transport .netty .server .TcpServerTransport ;
4832import io .rsocket .transport .netty .server .WebsocketServerTransport ;
4933import io .rsocket .util .DefaultPayload ;
50- import org .pcollections .HashTreePMap ;
51- import org .pcollections .PMap ;
34+ import org .springframework .stereotype .Controller ;
35+ import org .springframework .web .bind .annotation .GetMapping ;
36+ import org .springframework .web .bind .annotation .RestController ;
5237import org .xerial .snappy .Snappy ;
5338import reactor .core .publisher .Flux ;
5439import reactor .core .publisher .Mono ;
5540
56- import org .springframework .stereotype .Controller ;
57- import org .springframework .web .bind .annotation .GetMapping ;
58- import org .springframework .web .bind .annotation .RestController ;
41+ import javax .annotation .PostConstruct ;
42+ import javax .crypto .Cipher ;
43+ import javax .crypto .SecretKey ;
44+ import javax .crypto .spec .SecretKeySpec ;
45+ import java .io .IOException ;
46+ import java .io .UncheckedIOException ;
47+ import java .nio .channels .ClosedChannelException ;
48+ import java .security .*;
49+ import java .security .spec .PKCS8EncodedKeySpec ;
50+ import java .util .Map ;
51+ import java .util .concurrent .ConcurrentHashMap ;
52+ import java .util .stream .Collectors ;
5953
6054/**
6155 * A {@link Controller} for endpoints to be scraped by Prometheus.
@@ -69,25 +63,23 @@ class PrometheusController {
6963 private final Timer scrapeTimerSuccess ;
7064 private final Timer scrapeTimerClosed ;
7165 private final Counter scrapeSocketsClosed ;
72- private final Timer scrapeTimerError ;
7366 private final DistributionSummary scrapePayload ;
7467 private final MicrometerRSocketInterceptor metricsInterceptor ;
7568 private PrometheusControllerProperties properties ;
76- private AtomicReference < PMap < RSocket , ConnectionState >> scrapableApps = new AtomicReference <>(HashTreePMap . empty () );
69+ private Map < RSocket , ConnectionState > scrapableApps = new ConcurrentHashMap <>();
7770
7871 PrometheusController (PrometheusMeterRegistry meterRegistry , PrometheusControllerProperties properties ) {
7972 this .meterRegistry = meterRegistry ;
8073 this .metricsInterceptor = new MicrometerRSocketInterceptor (meterRegistry );
8174 this .properties = properties ;
82- meterRegistry .gauge ("prometheus.proxy.scrape.active.connections" , scrapableApps , apps -> apps . get (). size () );
75+ meterRegistry .gaugeMapSize ("prometheus.proxy.scrape.active.connections" , Tags . empty (), scrapableApps );
8376
8477 this .scrapeTimerSuccess = Timer .builder ("prometheus.proxy.scrape" )
8578 .tag ("outcome" , "success" )
8679 .publishPercentileHistogram ()
8780 .register (meterRegistry );
8881
8982 this .scrapeTimerClosed = meterRegistry .timer ("prometheus.proxy.scrape" , "outcome" , "closed" );
90- this .scrapeTimerError = meterRegistry .timer ("prometheus.proxy.scrape" , "outcome" , "error" );
9183 this .scrapePayload = DistributionSummary .builder ("prometheus.proxy.scrape.payload" )
9284 .publishPercentileHistogram ()
9385 .baseUnit ("bytes" )
@@ -119,7 +111,7 @@ private Mono<RSocket> acceptRSocket(KeyPairGenerator generator, RSocket sendingS
119111 // respond with Mono.error(..) to
120112
121113 ConnectionState connectionState = new ConnectionState (generator .generateKeyPair ());
122- scrapableApps .getAndUpdate ( apps -> apps . plus ( metricsInterceptor .apply (sendingSocket ), connectionState ) );
114+ scrapableApps .put ( metricsInterceptor .apply (sendingSocket ), connectionState );
123115
124116 // for use by the client to push metrics as it's dying if this happens before the first scrape
125117 sendingSocket .fireAndForget (connectionState .createKeyPayload ())
@@ -147,7 +139,7 @@ public Mono<String> proxyMetrics() {
147139 @ GetMapping (value = "/metrics/connected" , produces = "text/plain" )
148140 public Mono <String > prometheus () {
149141 return Flux
150- .fromIterable (scrapableApps .get (). entrySet ())
142+ .fromIterable (scrapableApps .entrySet ())
151143 .flatMap (socketAndState -> {
152144 ConnectionState connectionState = socketAndState .getValue ();
153145 RSocket rsocket = socketAndState .getKey ();
@@ -158,11 +150,14 @@ public Mono<String> prometheus() {
158150 .onErrorResume (throwable -> {
159151 if (throwable instanceof ClosedChannelException ) {
160152 scrapeSocketsClosed .increment ();
161- scrapableApps .getAndUpdate ( apps -> apps . minus ( rsocket ) );
153+ scrapableApps .remove ( rsocket );
162154 sample .stop (scrapeTimerClosed );
163155 return connectionState .getDyingPush ();
164156 }
165- sample .stop (scrapeTimerError );
157+
158+ sample .stop (meterRegistry .timer ("prometheus.proxy.scrape" , "outcome" , "error" ,
159+ "exception" , throwable .getClass ().getName ()));
160+
166161 return Mono .empty ();
167162 });
168163 })
@@ -171,6 +166,7 @@ public Mono<String> prometheus() {
171166
172167 class ConnectionState {
173168 private final KeyPair keyPair ;
169+
174170 // the last metrics of a dying application instance
175171 private String dyingPush ;
176172
0 commit comments