@@ -95,6 +95,7 @@ public class BacklogQuotaManagerTest {
9595 private static final Logger log = LoggerFactory .getLogger (BacklogQuotaManagerTest .class );
9696
9797 public static final String CLUSTER_NAME = "usc" ;
98+ private static final String QUOTA_CHECK_COUNT = "pulsar_storage_backlog_quota_check_duration_seconds_count" ;
9899 PulsarService pulsar ;
99100 ServiceConfiguration config ;
100101
@@ -934,11 +935,31 @@ private void unloadAndLoadTopic(String topic, Producer producer) throws PulsarAd
934935 }
935936
936937 private void waitForQuotaCheckToRunTwice () {
937- final long initialQuotaCheckCount = getQuotaCheckCount ();
938+ final long [] baselineCount = new long [1 ];
939+ final boolean [] baselineCaptured = new boolean [1 ];
940+
938941 Awaitility .await ()
939942 .pollInterval (1 , SECONDS )
940943 .atMost (TIME_TO_CHECK_BACKLOG_QUOTA * 3 , SECONDS )
941- .until (() -> getQuotaCheckCount () > initialQuotaCheckCount + 1 );
944+ .until (() -> {
945+ final java .util .OptionalLong countOpt = getQuotaCheckCount ();
946+
947+ // If /metrics is not returning the metric yet, keep waiting.
948+ // Don't take the baseline until a successful scrape shows the metric.
949+ if (countOpt .isEmpty ()) {
950+ return false ;
951+ }
952+
953+ // First successful scrape: capture baseline, then ask for two more checks.
954+ final long observedCount = countOpt .getAsLong ();
955+ if (!baselineCaptured [0 ]) {
956+ baselineCount [0 ] = observedCount ;
957+ baselineCaptured [0 ] = true ;
958+ return false ;
959+ }
960+
961+ return observedCount > baselineCount [0 ] + 1 ;
962+ });
942963 }
943964
944965 /**
@@ -952,12 +973,20 @@ private String waitForMarkDeletePositionToChange(String topic,
952973 markDeletePosition -> markDeletePosition != null && !markDeletePosition .equals (previousMarkDeletePosition ));
953974 }
954975
955- private long getQuotaCheckCount () {
956- Metrics metrics = prometheusMetricsClient .getMetrics ();
957- return (long ) metrics .findByNameAndLabels (
958- "pulsar_storage_backlog_quota_check_duration_seconds_count" ,
959- "cluster" , CLUSTER_NAME )
960- .get (0 ).value ;
976+ private java .util .OptionalLong getQuotaCheckCount () {
977+ try {
978+ final Metrics metrics = prometheusMetricsClient .getMetrics ();
979+ final java .util .List <Metric > matches =
980+ metrics .findByNameAndLabels (QUOTA_CHECK_COUNT , "cluster" , CLUSTER_NAME );
981+ if (matches .isEmpty ()) {
982+ // No metric sample for this name and labels in this scrape.
983+ return java .util .OptionalLong .empty ();
984+ }
985+ return java .util .OptionalLong .of ((long ) matches .get (0 ).value );
986+ } catch (Exception e ) {
987+ // Scrape failed or the metrics client threw, treat as not available now.
988+ return java .util .OptionalLong .empty ();
989+ }
961990 }
962991
963992 /**
0 commit comments