@@ -79,7 +79,8 @@ public void testOverconsumingTokensWithBrokerPublishRateLimiter() throws Excepti
7979 int durationSeconds = 5 ;
8080 int numberOfConsumers = 5 ;
8181 int numberOfProducersWithIndependentClients = 5 ;
82- int numberOfMessagesForEachProducer = (rateInMsg * (durationSeconds + 1 )) / 5 ;
82+ int numberOfMessagesForEachProducer = (rateInMsg * (durationSeconds + 1 ))
83+ / numberOfProducersWithIndependentClients ;
8384
8485 // configure dispatch throttling rate
8586 BrokerService brokerService = pulsar .getBrokerService ();
@@ -102,12 +103,12 @@ public void testOverconsumingTokensWithBrokerPublishRateLimiter() throws Excepti
102103 AtomicInteger lastCalculatedSecond = new AtomicInteger (0 );
103104 List <Integer > collectedRatesForEachSecond = Collections .synchronizedList (new ArrayList <>());
104105
105- // track actual consuming rate of messages per second
106+ // rack actual consuming rate of messages per second
107+ // (After aligning the first message, start counting whole seconds)
106108 Runnable rateTracker = () -> {
107109 long startTime = startTimeNanos .get ();
108110 if (startTime == 0 ) {
109- startTimeNanos .compareAndSet (0 , System .nanoTime ());
110- startTime = startTimeNanos .get ();
111+ return ;
111112 }
112113 long durationNanos = System .nanoTime () - startTime ;
113114 int elapsedFullSeconds = (int ) (durationNanos / 1e9 );
@@ -124,10 +125,15 @@ public void testOverconsumingTokensWithBrokerPublishRateLimiter() throws Excepti
124125 ScheduledFuture <?> scheduledFuture = executor .scheduleAtFixedRate (rateTracker , 0 , 500 , TimeUnit .MILLISECONDS );
125126
126127 // message listener implementation used for all consumers
128+ // Set startTime when the first message arrives; then accumulate the counter
127129 MessageListener <Integer > messageListener = new MessageListener <>() {
128130 @ Override
129131 public void received (Consumer <Integer > consumer , Message <Integer > msg ) {
130- lastReceivedMessageTimeNanos .set (System .nanoTime ());
132+ long now = System .nanoTime ();
133+ if (startTimeNanos .get () == 0 ) {
134+ startTimeNanos .compareAndSet (0 , now );
135+ }
136+ lastReceivedMessageTimeNanos .set (now );
131137 currentSecondMessagesCount .incrementAndGet ();
132138 totalMessagesReceived .incrementAndGet ();
133139 consumer .acknowledgeAsync (msg );
@@ -233,33 +239,55 @@ public void received(Consumer<Integer> consumer, Message<Integer> msg) {
233239 });
234240 };
235241
236- // wait for results
242+ // Wait for all messages to be consumed
237243 Awaitility .await ()
238244 .atMost (Duration .ofSeconds (durationSeconds * 2 ))
239245 .pollInterval (Duration .ofMillis (100 ))
240- .untilAsserted (
241- () -> assertThat (collectedRatesForEachSecond ).hasSizeGreaterThanOrEqualTo (durationSeconds ));
246+ .untilAsserted (() ->
247+ assertThat (totalMessagesReceived .get ())
248+ .isEqualTo (numberOfProducersWithIndependentClients * numberOfMessagesForEachProducer ));
249+
250+ // Collect per-second windows, and add the last half-second remainder
242251 List <Integer > collectedRatesSnapshot = new ArrayList <>(collectedRatesForEachSecond );
252+ int tail = currentSecondMessagesCount .getAndSet (0 );
253+ if (tail > 0 ) {
254+ collectedRatesSnapshot .add (tail );
255+ }
243256 log .info ("Collected rates for each second: {}" , collectedRatesSnapshot );
244- long avgMsgRate =
245- totalMessagesReceived .get () / TimeUnit .NANOSECONDS .toSeconds (
246- lastReceivedMessageTimeNanos .get () - startTimeNanos .get ());
247- log .info ("Average rate during the test run: {} msg/s" , avgMsgRate );
257+
258+ // Calculate the average using second-by-second windows:
259+ // Skip the first second, take up to durationSeconds windows.
260+ int usable = Math .min (durationSeconds , Math .max (0 , collectedRatesSnapshot .size () - 1 ));
261+ double windowedAvg = (usable > 0 )
262+ ? collectedRatesSnapshot .subList (1 , 1 + usable ).stream ().mapToInt (Integer ::intValue ).average ().orElse (0 )
263+ : 0 ;
248264
249265 assertSoftly (softly -> {
250- // check the rate during the test run
251- softly .assertThat (avgMsgRate ). describedAs ( "average rate during the test run" )
252- // allow rate in 40% range
266+ // Overall average (window mean, ±40%)
267+ softly .assertThat (windowedAvg )
268+ . describedAs ( "windowed average rate during the test run" )
253269 .isCloseTo (rateInMsg , Percentage .withPercentage (40 ));
254270
255- // check that rates were collected
256- // skip the first element as it might contain messages for first 2 seconds
257- softly .assertThat (collectedRatesSnapshot .subList (1 , collectedRatesSnapshot .size () - 1 ))
258- .describedAs ("actual rates for each second" )
259- .allSatisfy (rates -> {
260- assertThat (rates ).describedAs ("actual rate for each second" )
271+ // Per second (average of two adjacent seconds, skip first/last pairs, ±55%)
272+ if (collectedRatesSnapshot .size () >= 4 ) {
273+ // Starting from (1, 2), ending at (size-2, size-1)
274+ for (int i = 2 ; i < collectedRatesSnapshot .size (); i ++) {
275+ int avg2 = (collectedRatesSnapshot .get (i - 1 ) + collectedRatesSnapshot .get (i )) / 2 ;
276+ softly .assertThat (avg2 )
277+ .describedAs ("Average of second %d and %d" , i , i + 1 )
278+ .isCloseTo (rateInMsg , Percentage .withPercentage (55 ));
279+ }
280+ } else {
281+ // Degenerates to: Skip head and tail with 50% tolerance (avoid false positives)
282+ // when there are too few windows
283+ if (collectedRatesSnapshot .size () > 2 ) {
284+ for (int i = 1 ; i < collectedRatesSnapshot .size () - 1 ; i ++) {
285+ softly .assertThat (collectedRatesSnapshot .get (i ))
286+ .describedAs ("rate of second %d (fallback check)" , i + 1 )
261287 .isCloseTo (rateInMsg , Percentage .withPercentage (50 ));
262- });
288+ }
289+ }
290+ }
263291 });
264292 scheduledFuture .cancel (true );
265293 producersClose .close ();
0 commit comments