diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java index 5b0676088e1e5..3a3e3ff8eef9c 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java @@ -3399,56 +3399,52 @@ public void testEpochEntriesAsByteBuffer() throws Exception { assertEquals(expectedEpoch + " " + expectedStartOffset, bufferedReader.readLine()); } - @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testCopyQuota(boolean quotaExceeded) throws Exception { - RemoteLogManager.RLMCopyTask task = setupRLMTask(quotaExceeded); - - if (quotaExceeded) { - // Verify that the copy operation times out, since no segments can be copied due to quota being exceeded - assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog))); - - Map allMetrics = metrics.metrics(); - KafkaMetric avgMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-avg", "RemoteLogManager")); - KafkaMetric maxMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-max", "RemoteLogManager")); - assertEquals(quotaExceededThrottleTime, ((Double) avgMetric.metricValue()).longValue()); - assertEquals(quotaExceededThrottleTime, ((Double) maxMetric.metricValue()).longValue()); - - // Verify the highest offset in remote storage is updated only once - ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); - verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture()); - // Verify the highest offset in remote storage was -1L before the copy started - assertEquals(-1L, capture.getValue()); - } else { - // Verify the copy operation completes within the timeout, since it does not need to wait for quota availability - assertTimeoutPreemptively(Duration.ofMillis(1000), () -> task.copyLogSegmentsToRemote(mockLog)); - - // Verify quota check was performed - verify(rlmCopyQuotaManager, times(1)).getThrottleTimeMs(); - // Verify bytes to copy was recorded with the quota manager - verify(rlmCopyQuotaManager, times(1)).record(10); - - Map allMetrics = metrics.metrics(); - KafkaMetric avgMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-avg", "RemoteLogManager")); - KafkaMetric maxMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-max", "RemoteLogManager")); - if (quotaExceeded) { - assertEquals(Double.NaN, avgMetric.metricValue()); - assertEquals(Double.NaN, maxMetric.metricValue()); - } else { - // Metrics are not created until they actually get recorded (e.g. if the quota is exceeded). - assertNull(avgMetric); - assertNull(maxMetric); - } + @Test + public void testCopyQuotaWhenQuotaExceeded() throws Exception { + RemoteLogManager.RLMCopyTask task = setupRLMTask(true); + // Verify that the copy operation times out, since no segments can be copied due to quota being exceeded + assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog))); - // Verify the highest offset in remote storage is updated - ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); - verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture()); - List capturedValues = capture.getAllValues(); - // Verify the highest offset in remote storage was -1L before the copy - assertEquals(-1L, capturedValues.get(0).longValue()); - // Verify it was updated to 149L after the copy - assertEquals(149L, capturedValues.get(1).longValue()); - } + Map allMetrics = metrics.metrics(); + KafkaMetric avgMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-avg", "RemoteLogManager")); + KafkaMetric maxMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-max", "RemoteLogManager")); + assertEquals(quotaExceededThrottleTime, ((Double) avgMetric.metricValue()).longValue()); + assertEquals(quotaExceededThrottleTime, ((Double) maxMetric.metricValue()).longValue()); + + // Verify the highest offset in remote storage is updated only once + ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); + verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture()); + // Verify the highest offset in remote storage was -1L before the copy started + assertEquals(-1L, capture.getValue()); + } + + @Test + public void testCopyQuotaWhenQuotaNotExceeded() throws Exception { + RemoteLogManager.RLMCopyTask task = setupRLMTask(false); + // Verify the copy operation completes within the timeout, since it does not need to wait for quota availability + assertTimeoutPreemptively(Duration.ofMillis(1000), () -> task.copyLogSegmentsToRemote(mockLog)); + + // Verify quota check was performed + verify(rlmCopyQuotaManager, times(1)).getThrottleTimeMs(); + // Verify bytes to copy was recorded with the quota manager + verify(rlmCopyQuotaManager, times(1)).record(10); + + Map allMetrics = metrics.metrics(); + KafkaMetric avgMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-avg", "RemoteLogManager")); + KafkaMetric maxMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-max", "RemoteLogManager")); + + // Metrics are not created until they actually get recorded (e.g. if the quota is exceeded). + assertNull(avgMetric); + assertNull(maxMetric); + + // Verify the highest offset in remote storage is updated + ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); + verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture()); + List capturedValues = capture.getAllValues(); + // Verify the highest offset in remote storage was -1L before the copy + assertEquals(-1L, capturedValues.get(0).longValue()); + // Verify it was updated to 149L after the copy + assertEquals(149L, capturedValues.get(1).longValue()); } @Test