From 3cb5b8bab3e475e318512dbfc6d1cc5ec93a813c Mon Sep 17 00:00:00 2001 From: Alex Christensen Date: Wed, 29 Oct 2025 17:08:35 +0000 Subject: [PATCH] M365 Crawler Metric, Buffer, and Unit Test Updates Signed-off-by: Alex Christensen --- .../microsoft-office365-source/build.gradle | 1 + .../Office365CrawlerClient.java | 25 +-- .../Office365RestClient.java | 30 +++- .../Office365RestClientTest.java | 148 +++++++++++++++++- .../base/TokenPaginationCrawler.java | 9 +- .../base/TokenPaginationCrawlerTest.java | 73 +++++++++ 6 files changed, 269 insertions(+), 17 deletions(-) diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/build.gradle b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/build.gradle index c6e75fb0dd..dc2abe25c1 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/build.gradle +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/build.gradle @@ -29,6 +29,7 @@ dependencies { annotationProcessor 'org.projectlombok:lombok:1.18.30' testImplementation project(':data-prepper-test:test-common') + testRuntimeOnly 'org.slf4j:slf4j-simple:2.0.7' testImplementation testLibs.awaitility implementation(libs.spring.context) { diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java index 3d928cda69..249d281da2 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java @@ -106,9 +106,9 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state try { String nextPageUri = null; - List> records = new ArrayList<>(); do { + List> records = new ArrayList<>(); AuditLogsResponse response = service.searchAuditLogs(logType, startTime, endTime, nextPageUri); @@ -138,18 +138,22 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state } } + // Write Records to the buffer after processing a page of data + bufferWriteLatencyTimer.record(() -> { + try { + writeRecordsWithRetry(records, buffer, acknowledgementSet); + } catch (Exception e) { + bufferWriteFailuresCounter.increment(); + throw e; + } + }); + nextPageUri = response.getNextPageUri(); } while (nextPageUri != null); - bufferWriteLatencyTimer.record(() -> { - try { - writeRecordsWithRetry(records, buffer, acknowledgementSet); - } catch (Exception e) { - bufferWriteFailuresCounter.increment(); - throw e; - } - }); - + if (configuration.isAcknowledgments()) { + acknowledgementSet.complete(); + } } catch (Exception e) { log.error(NOISY, "Failed to process partition for log type {} from {} to {}", logType, startTime, endTime, e); @@ -212,7 +216,6 @@ private void writeRecordsWithRetry(final List> records, if (configuration.isAcknowledgments()) { records.forEach(record -> acknowledgementSet.add(record.getData())); buffer.writeAll(records, (int) Duration.ofSeconds(BUFFER_TIMEOUT_IN_SECONDS).toMillis()); - acknowledgementSet.complete(); } else { buffer.writeAll(records, (int) Duration.ofSeconds(BUFFER_TIMEOUT_IN_SECONDS).toMillis()); } diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java index ca8eae90ff..93a72b5d39 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java @@ -10,6 +10,7 @@ package org.opensearch.dataprepper.plugins.source.microsoft_office365; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; import lombok.extern.slf4j.Slf4j; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -26,6 +27,7 @@ import org.springframework.web.client.RestTemplate; import javax.inject.Named; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.List; import java.util.Map; @@ -41,10 +43,13 @@ @Named public class Office365RestClient { private static final String AUDIT_LOG_FETCH_LATENCY = "auditLogFetchLatency"; - private static final String SEARCH_CALL_LATENCY = "searchCallLatency"; - private static final String AUDIT_LOGS_REQUESTED = "auditLogsRequested"; + private static final String AUDIT_LOG_RESPONSE_SIZE = "auditLogResponseSizeBytes"; private static final String AUDIT_LOG_REQUESTS_FAILED = "auditLogRequestsFailed"; private static final String AUDIT_LOG_REQUESTS_SUCCESS = "auditLogRequestsSuccess"; + private static final String AUDIT_LOGS_REQUESTED = "auditLogsRequested"; + private static final String SEARCH_CALL_LATENCY = "searchCallLatency"; + private static final String SEARCH_RESPONSE_SIZE = "searchResponseSizeBytes"; + private static final String SEARCH_REQUESTS_SUCCESS = "searchRequestsSuccess"; private static final String SEARCH_REQUESTS_FAILED = "searchRequestsFailed"; private static final String MANAGEMENT_API_BASE_URL = "https://manage.office.com/api/v1.0/"; @@ -56,6 +61,9 @@ public class Office365RestClient { private final Counter auditLogRequestsFailedCounter; private final Counter auditLogRequestsSuccessCounter; private final Counter searchRequestsFailedCounter; + private final Counter searchRequestsSuccessCounter; + private final DistributionSummary auditLogResponseSizeSummary; + private final DistributionSummary searchResponseSizeSummary; public Office365RestClient(final Office365AuthenticationInterface authConfig, final PluginMetrics pluginMetrics) { @@ -67,6 +75,9 @@ public Office365RestClient(final Office365AuthenticationInterface authConfig, this.auditLogRequestsFailedCounter = pluginMetrics.counter(AUDIT_LOG_REQUESTS_FAILED); this.auditLogRequestsSuccessCounter = pluginMetrics.counter(AUDIT_LOG_REQUESTS_SUCCESS); this.searchRequestsFailedCounter = pluginMetrics.counter(SEARCH_REQUESTS_FAILED); + this.searchRequestsSuccessCounter = pluginMetrics.counter(SEARCH_REQUESTS_SUCCESS); + this.auditLogResponseSizeSummary = pluginMetrics.summary(AUDIT_LOG_RESPONSE_SIZE); + this.searchResponseSizeSummary = pluginMetrics.summary(SEARCH_RESPONSE_SIZE); } /** @@ -170,6 +181,8 @@ public AuditLogsResponse searchAuditLogs(final String contentType, new HttpEntity<>(headers), new ParameterizedTypeReference<>() {} ); + // Record search request size. + searchResponseSizeSummary.record(response.getHeaders().getContentLength()); // Extract NextPageUri from response headers List nextPageHeaders = response.getHeaders().get("NextPageUri"); @@ -180,6 +193,7 @@ public AuditLogsResponse searchAuditLogs(final String contentType, log.debug("Next page URI found: {}", nextPageUri); } + searchRequestsSuccessCounter.increment(); return new AuditLogsResponse(response.getBody(), nextPageUri); }, authConfig::renewCredentials, @@ -210,12 +224,20 @@ public String getAuditLog(String contentUri) { try { String response = RetryHandler.executeWithRetry(() -> { headers.setBearerAuth(authConfig.getAccessToken()); - return restTemplate.exchange( + ResponseEntity responseEntity = restTemplate.exchange( contentUri, HttpMethod.GET, new HttpEntity<>(headers), String.class - ).getBody(); + ); + + // Record audit log request size from response body + String responseBody = responseEntity.getBody(); + if (responseBody != null) { + auditLogResponseSizeSummary.record(responseBody.getBytes(StandardCharsets.UTF_8).length); + } + + return responseBody; }, authConfig::renewCredentials, auditLogRequestsFailedCounter); auditLogRequestsSuccessCounter.increment(); return response; diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java index dc3393e23d..a1828b61ae 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java @@ -9,10 +9,13 @@ package org.opensearch.dataprepper.plugins.source.microsoft_office365; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import io.micrometer.core.instrument.Counter; + import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -347,4 +350,147 @@ void testGetAuditLogFailureCounterIncrementsOnEachRetry() throws Exception { // Verify counter.increment() was called exactly 6 times (once for each retry attempt) verify(mockCounter, times(6)).increment(); } + + @Test + void testMetricsInitialization() { + // Test metrics initialization during construction. This approach is used for metrics that are called + // inside RetryHandler.executeWithRetry() static method calls, which would require complex static mocking + // to test for invocation. Testing initialization ensures the metrics infrastructure is properly set up. + + // Mock all required timers and counters for Office365RestClient constructor + PluginMetrics mockPluginMetrics = org.mockito.Mockito.mock(PluginMetrics.class); + Timer mockAuditLogFetchLatencyTimer = org.mockito.Mockito.mock(Timer.class); + Timer mockSearchCallLatencyTimer = org.mockito.Mockito.mock(Timer.class); + Counter mockAuditLogsRequestedCounter = org.mockito.Mockito.mock(Counter.class); + Counter mockAuditLogRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class); + Counter mockAuditLogRequestsSuccessCounter = org.mockito.Mockito.mock(Counter.class); + Counter mockSearchRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class); + Counter mockSearchRequestsSuccessCounter = org.mockito.Mockito.mock(Counter.class); + DistributionSummary mockAuditLogRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class); + DistributionSummary mockSearchRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class); + + when(mockPluginMetrics.timer("auditLogFetchLatency")).thenReturn(mockAuditLogFetchLatencyTimer); + when(mockPluginMetrics.timer("searchCallLatency")).thenReturn(mockSearchCallLatencyTimer); + when(mockPluginMetrics.counter("auditLogsRequested")).thenReturn(mockAuditLogsRequestedCounter); + when(mockPluginMetrics.counter("auditLogRequestsFailed")).thenReturn(mockAuditLogRequestsFailedCounter); + when(mockPluginMetrics.counter("auditLogRequestsSuccess")).thenReturn(mockAuditLogRequestsSuccessCounter); + when(mockPluginMetrics.counter("searchRequestsFailed")).thenReturn(mockSearchRequestsFailedCounter); + when(mockPluginMetrics.counter("searchRequestsSuccess")).thenReturn(mockSearchRequestsSuccessCounter); + when(mockPluginMetrics.summary("auditLogResponseSizeBytes")).thenReturn(mockAuditLogRequestSizeSummary); + when(mockPluginMetrics.summary("searchResponseSizeBytes")).thenReturn(mockSearchRequestSizeSummary); + + // Create Office365RestClient with mocked metrics + Office365RestClient testClient = new Office365RestClient(authConfig, mockPluginMetrics); + + // Verify all metrics were requested during construction + verify(mockPluginMetrics).timer("auditLogFetchLatency"); + verify(mockPluginMetrics).timer("searchCallLatency"); + verify(mockPluginMetrics).counter("auditLogsRequested"); + verify(mockPluginMetrics).counter("auditLogRequestsFailed"); + verify(mockPluginMetrics).counter("auditLogRequestsSuccess"); + verify(mockPluginMetrics).counter("searchRequestsFailed"); + verify(mockPluginMetrics).counter("searchRequestsSuccess"); + verify(mockPluginMetrics).summary("auditLogResponseSizeBytes"); + verify(mockPluginMetrics).summary("searchResponseSizeBytes"); + } + + @Test + void testGetAuditLogMetricsInvocation() throws NoSuchFieldException, IllegalAccessException { + // Test metrics for getAuditLog() method - both success and failure scenarios + + // Create mock metrics and inject them + Counter mockAuditLogsRequestedCounter = org.mockito.Mockito.mock(Counter.class); + Counter mockAuditLogRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class); + Timer mockAuditLogFetchLatencyTimer = org.mockito.Mockito.mock(Timer.class); + DistributionSummary mockAuditLogRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class); + + ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "auditLogsRequestedCounter", mockAuditLogsRequestedCounter); + ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "auditLogRequestsFailedCounter", mockAuditLogRequestsFailedCounter); + ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "auditLogFetchLatencyTimer", mockAuditLogFetchLatencyTimer); + ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "auditLogResponseSizeSummary", mockAuditLogRequestSizeSummary); + + // Mock timer.record() to execute the lambda + when(mockAuditLogFetchLatencyTimer.record(any(java.util.function.Supplier.class))).thenAnswer(invocation -> { + java.util.function.Supplier supplier = invocation.getArgument(0); + return supplier.get(); + }); + + String contentUri = "https://manage.office.com/api/v1.0/test-tenant/activity/feed/audit/123"; + + // Test success scenario + String mockAuditLog = "{\"id\":\"123\",\"contentType\":\"Audit.AzureActiveDirectory\"}"; + ResponseEntity mockResponse = new ResponseEntity<>(mockAuditLog, HttpStatus.OK); + when(restTemplate.exchange(eq(contentUri), eq(HttpMethod.GET), any(), eq(String.class))) + .thenReturn(mockResponse); + + office365RestClient.getAuditLog(contentUri); + + // Verify success metrics + verify(mockAuditLogsRequestedCounter).increment(); // Called directly before RetryHandler + verify(mockAuditLogFetchLatencyTimer).record(any(java.util.function.Supplier.class)); // Timer wrapper + verify(mockAuditLogRequestSizeSummary).record(mockAuditLog.getBytes(java.nio.charset.StandardCharsets.UTF_8).length); // Size metric inside RetryHandler + + // Test failure scenario + when(restTemplate.exchange(eq(contentUri), eq(HttpMethod.GET), any(), eq(String.class))) + .thenThrow(new HttpClientErrorException(HttpStatus.INTERNAL_SERVER_ERROR)); + + assertThrows(RuntimeException.class, () -> office365RestClient.getAuditLog(contentUri)); + + // Verify failure metrics + verify(mockAuditLogsRequestedCounter, times(2)).increment(); // Called again before retry + verify(mockAuditLogRequestsFailedCounter, times(6)).increment(); // Called 6 times (once for each retry attempt) + } + + @Test + void testSearchAuditLogsMetricsInvocation() throws NoSuchFieldException, IllegalAccessException { + // Test metrics for searchAuditLogs() method - both success and failure scenarios + + // Create mock metrics and inject them + Counter mockSearchRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class); + Timer mockSearchCallLatencyTimer = org.mockito.Mockito.mock(Timer.class); + DistributionSummary mockSearchRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class); + + ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "searchRequestsFailedCounter", mockSearchRequestsFailedCounter); + ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "searchCallLatencyTimer", mockSearchCallLatencyTimer); + ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "searchResponseSizeSummary", mockSearchRequestSizeSummary); + + // Mock timer.record() to execute the lambda + when(mockSearchCallLatencyTimer.record(any(java.util.function.Supplier.class))).thenAnswer(invocation -> { + java.util.function.Supplier supplier = invocation.getArgument(0); + return supplier.get(); + }); + + // Test success scenario + List> mockResults = Collections.singletonList(new HashMap<>()); + HttpHeaders responseHeaders = new HttpHeaders(); + responseHeaders.setContentLength(1024L); // Mock content length + ResponseEntity>> mockResponse = new ResponseEntity<>(mockResults, responseHeaders, HttpStatus.OK); + when(restTemplate.exchange(anyString(), eq(HttpMethod.GET), any(), any(ParameterizedTypeReference.class))) + .thenReturn(mockResponse); + + office365RestClient.searchAuditLogs( + "Audit.AzureActiveDirectory", + Instant.now().minus(1, ChronoUnit.HOURS), + Instant.now(), + null + ); + + // Verify success metrics + verify(mockSearchCallLatencyTimer).record(any(java.util.function.Supplier.class)); // Timer wrapper + verify(mockSearchRequestSizeSummary).record(1024L); // Size metric inside RetryHandler + + // Test failure scenario + when(restTemplate.exchange(anyString(), eq(HttpMethod.GET), any(), any(ParameterizedTypeReference.class))) + .thenThrow(new HttpClientErrorException(HttpStatus.INTERNAL_SERVER_ERROR)); + + assertThrows(RuntimeException.class, () -> office365RestClient.searchAuditLogs( + "Audit.AzureActiveDirectory", + Instant.now().minus(1, ChronoUnit.HOURS), + Instant.now(), + null + )); + + // Verify failure metrics + verify(mockSearchRequestsFailedCounter, times(6)).increment(); // Called 6 times (once for each retry attempt) + } } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawler.java index 17bef9afe1..ca6a9c378f 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawler.java @@ -33,7 +33,11 @@ public class TokenPaginationCrawler implements Crawler> buffer, AcknowledgementSet acknowledgementSet) { - client.executePartition(state, buffer, acknowledgementSet); + partitionWaitTimeTimer.record(Duration.between(state.getExportStartTime(), Instant.now())); + partitionProcessLatencyTimer.record(() -> client.executePartition(state, buffer, acknowledgementSet)); } private void updateLeaderProgressState(LeaderPartition leaderPartition, String updatedToken, EnhancedSourceCoordinator coordinator) { diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawlerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawlerTest.java index 9bc2daee78..02aa19468e 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawlerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawlerTest.java @@ -1,5 +1,7 @@ package org.opensearch.dataprepper.plugins.source.source_crawler.base; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -18,6 +20,7 @@ import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -33,6 +36,9 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; import static org.mockito.internal.verification.VerificationModeFactory.times; @ExtendWith(MockitoExtension.class) @@ -51,6 +57,17 @@ public class TokenPaginationCrawlerTest { private PaginationCrawlerWorkerProgressState state; @Mock private LeaderPartition leaderPartition; + @Mock + private Timer partitionWaitTimeTimer; + @Mock + private Timer partitionProcessLatencyTimer; + @Mock + private Timer crawlingTimer; + @Mock + private Counter mockCounter; + @Mock + private PluginMetrics mockPluginMetrics; + private Crawler crawler; private final PluginMetrics pluginMetrics = PluginMetrics.fromNames("CrawlerTest", "crawler"); @@ -69,10 +86,66 @@ public void crawlerConstructionTest() { @Test public void executePartitionTest() { reset(leaderPartition); + // Mock the getExportStartTime() method to return a valid Instant + when(state.getExportStartTime()).thenReturn(Instant.now().minusSeconds(1)); crawler.executePartition(state, buffer, acknowledgementSet); verify(client).executePartition(state, buffer, acknowledgementSet); } + @Test + public void test_metrics_in_crawler() { + reset(leaderPartition); + + // Mock all required timers and counters for TokenPaginationCrawler constructor + Timer mockCrawlingTimer = mock(Timer.class); + Counter mockPartitionsCreatedCounter = mock(Counter.class); + Counter mockInvalidItemsCounter = mock(Counter.class); + + when(mockPluginMetrics.timer("crawlingTime")).thenReturn(mockCrawlingTimer); + when(mockPluginMetrics.timer("WorkerPartitionWaitTime")).thenReturn(partitionWaitTimeTimer); + when(mockPluginMetrics.timer("WorkerPartitionProcessLatency")).thenReturn(partitionProcessLatencyTimer); + when(mockPluginMetrics.counter("paginationWorkerPartitionsCreated")).thenReturn(mockPartitionsCreatedCounter); + when(mockPluginMetrics.counter("invalidPaginationItems")).thenReturn(mockInvalidItemsCounter); + + TokenPaginationCrawler testCrawler = new TokenPaginationCrawler(client, mockPluginMetrics); + + // Test executePartition method which uses partitionWaitTimeTimer and partitionProcessLatencyTimer + when(state.getExportStartTime()).thenReturn(Instant.now().minusSeconds(1)); + + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(partitionProcessLatencyTimer).record(any(Runnable.class)); + doNothing().when(partitionWaitTimeTimer).record(any(Duration.class)); + + testCrawler.executePartition(state, buffer, acknowledgementSet); + + // Verify executePartition metrics + verify(partitionProcessLatencyTimer).record(any(Runnable.class)); + verify(partitionWaitTimeTimer).record(any(Duration.class)); + + // Test crawl method which uses crawlingTimer, parititionsCreatedCounter, and invalidPaginationItemsCounter + // Re-setup leaderPartition mock after reset + when(leaderPartition.getProgressState()).thenReturn(Optional.of(new TokenPaginationCrawlerLeaderProgressState(INITIAL_TOKEN))); + + List itemInfoList = new ArrayList<>(); + itemInfoList.add(null); // This will trigger invalidPaginationItemsCounter + itemInfoList.add(new TestItemInfo("testId1")); + + when(client.listItems(INITIAL_TOKEN)).thenReturn(itemInfoList.iterator()); + doNothing().when(mockCrawlingTimer).record(any(Long.class), any()); + doNothing().when(mockPartitionsCreatedCounter).increment(); + doNothing().when(mockInvalidItemsCounter).increment(); + + testCrawler.crawl(leaderPartition, coordinator); + + // Verify crawl metrics + verify(mockCrawlingTimer).record(any(Long.class), any()); // crawlingTimer records crawl duration + verify(mockPartitionsCreatedCounter).increment(); // parititionsCreatedCounter increments when partition is created + verify(mockInvalidItemsCounter).increment(); // invalidPaginationItemsCounter increments for null items + } + @Test void testCrawlWithEmptyList() { String lastToken = INITIAL_TOKEN;