Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3399,56 +3399,52 @@ public void testEpochEntriesAsByteBuffer() throws Exception {
assertEquals(expectedEpoch + " " + expectedStartOffset, bufferedReader.readLine());
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testCopyQuota(boolean quotaExceeded) throws Exception {
RemoteLogManager.RLMCopyTask task = setupRLMTask(quotaExceeded);

if (quotaExceeded) {
// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded
assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog)));

Map<org.apache.kafka.common.MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric avgMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-avg", "RemoteLogManager"));
KafkaMetric maxMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-max", "RemoteLogManager"));
assertEquals(quotaExceededThrottleTime, ((Double) avgMetric.metricValue()).longValue());
assertEquals(quotaExceededThrottleTime, ((Double) maxMetric.metricValue()).longValue());

// Verify the highest offset in remote storage is updated only once
ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture());
// Verify the highest offset in remote storage was -1L before the copy started
assertEquals(-1L, capture.getValue());
} else {
// Verify the copy operation completes within the timeout, since it does not need to wait for quota availability
assertTimeoutPreemptively(Duration.ofMillis(1000), () -> task.copyLogSegmentsToRemote(mockLog));

// Verify quota check was performed
verify(rlmCopyQuotaManager, times(1)).getThrottleTimeMs();
// Verify bytes to copy was recorded with the quota manager
verify(rlmCopyQuotaManager, times(1)).record(10);

Map<org.apache.kafka.common.MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric avgMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-avg", "RemoteLogManager"));
KafkaMetric maxMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-max", "RemoteLogManager"));
if (quotaExceeded) {
assertEquals(Double.NaN, avgMetric.metricValue());
assertEquals(Double.NaN, maxMetric.metricValue());
Comment on lines -3434 to -3436
Copy link
Contributor Author

@UladzislauBlok UladzislauBlok Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} else {
// Metrics are not created until they actually get recorded (e.g. if the quota is exceeded).
assertNull(avgMetric);
assertNull(maxMetric);
}
@Test
public void testCopyQuotaWhenQuotaExceeded() throws Exception {
RemoteLogManager.RLMCopyTask task = setupRLMTask(true);
// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded
assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog)));

// Verify the highest offset in remote storage is updated
ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
List<Long> capturedValues = capture.getAllValues();
// Verify the highest offset in remote storage was -1L before the copy
assertEquals(-1L, capturedValues.get(0).longValue());
// Verify it was updated to 149L after the copy
assertEquals(149L, capturedValues.get(1).longValue());
}
Map<org.apache.kafka.common.MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric avgMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-avg", "RemoteLogManager"));
KafkaMetric maxMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-max", "RemoteLogManager"));
assertEquals(quotaExceededThrottleTime, ((Double) avgMetric.metricValue()).longValue());
assertEquals(quotaExceededThrottleTime, ((Double) maxMetric.metricValue()).longValue());

// Verify the highest offset in remote storage is updated only once
ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture());
// Verify the highest offset in remote storage was -1L before the copy started
assertEquals(-1L, capture.getValue());
}

@Test
public void testCopyQuotaWhenQuotaNotExceeded() throws Exception {
RemoteLogManager.RLMCopyTask task = setupRLMTask(false);
// Verify the copy operation completes within the timeout, since it does not need to wait for quota availability
assertTimeoutPreemptively(Duration.ofMillis(1000), () -> task.copyLogSegmentsToRemote(mockLog));

// Verify quota check was performed
verify(rlmCopyQuotaManager, times(1)).getThrottleTimeMs();
// Verify bytes to copy was recorded with the quota manager
verify(rlmCopyQuotaManager, times(1)).record(10);

Map<org.apache.kafka.common.MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric avgMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-avg", "RemoteLogManager"));
KafkaMetric maxMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-max", "RemoteLogManager"));

// Metrics are not created until they actually get recorded (e.g. if the quota is exceeded).
assertNull(avgMetric);
assertNull(maxMetric);

// Verify the highest offset in remote storage is updated
ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
List<Long> capturedValues = capture.getAllValues();
// Verify the highest offset in remote storage was -1L before the copy
assertEquals(-1L, capturedValues.get(0).longValue());
// Verify it was updated to 149L after the copy
assertEquals(149L, capturedValues.get(1).longValue());
}

@Test
Expand Down