Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies {
annotationProcessor 'org.projectlombok:lombok:1.18.30'

testImplementation project(':data-prepper-test:test-common')
testRuntimeOnly 'org.slf4j:slf4j-simple:2.0.7'

implementation(libs.spring.context) {
exclude group: 'commons-logging', module: 'commons-logging'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state

try {
String nextPageUri = null;
List<Record<Event>> records = new ArrayList<>();

do {
List<Record<Event>> records = new ArrayList<>();
AuditLogsResponse response =
service.searchAuditLogs(logType, startTime, endTime, nextPageUri);

Expand Down Expand Up @@ -138,18 +138,24 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
}
}

// Write Records to the buffer after processing a page of data if there are records to write.
if(!records.isEmpty()) {
bufferWriteLatencyTimer.record(() -> {
try {
writeRecordsWithRetry(records, buffer, acknowledgementSet);
} catch (Exception e) {
bufferWriteFailuresCounter.increment();
throw e;
}
});
}

nextPageUri = response.getNextPageUri();
} while (nextPageUri != null);

bufferWriteLatencyTimer.record(() -> {
try {
writeRecordsWithRetry(records, buffer, acknowledgementSet);
} catch (Exception e) {
bufferWriteFailuresCounter.increment();
throw e;
}
});

if (configuration.isAcknowledgments()) {
acknowledgementSet.complete();
}
} catch (Exception e) {
log.error(NOISY, "Failed to process partition for log type {} from {} to {}",
logType, startTime, endTime, e);
Expand Down Expand Up @@ -212,7 +218,6 @@ private void writeRecordsWithRetry(final List<Record<Event>> records,
if (configuration.isAcknowledgments()) {
records.forEach(record -> acknowledgementSet.add(record.getData()));
buffer.writeAll(records, (int) Duration.ofSeconds(BUFFER_TIMEOUT_IN_SECONDS).toMillis());
acknowledgementSet.complete();
} else {
buffer.writeAll(records, (int) Duration.ofSeconds(BUFFER_TIMEOUT_IN_SECONDS).toMillis());
}
Expand Down Expand Up @@ -250,4 +255,4 @@ private void writeRecordsWithRetry(final List<Record<Event>> records,
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.opensearch.dataprepper.plugins.source.microsoft_office365;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand All @@ -26,6 +27,8 @@
import org.springframework.web.client.RestTemplate;

import javax.inject.Named;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.List;
import java.util.Map;
Expand All @@ -41,10 +44,13 @@
@Named
public class Office365RestClient {
private static final String AUDIT_LOG_FETCH_LATENCY = "auditLogFetchLatency";
private static final String SEARCH_CALL_LATENCY = "searchCallLatency";
private static final String AUDIT_LOGS_REQUESTED = "auditLogsRequested";
private static final String AUDIT_LOG_RESPONSE_SIZE = "auditLogResponseSizeBytes";
private static final String AUDIT_LOG_REQUESTS_FAILED = "auditLogRequestsFailed";
private static final String AUDIT_LOG_REQUESTS_SUCCESS = "auditLogRequestsSuccess";
private static final String AUDIT_LOGS_REQUESTED = "auditLogsRequested";
private static final String SEARCH_CALL_LATENCY = "searchCallLatency";
private static final String SEARCH_RESPONSE_SIZE = "searchResponseSizeBytes";
private static final String SEARCH_REQUESTS_SUCCESS = "searchRequestsSuccess";
private static final String SEARCH_REQUESTS_FAILED = "searchRequestsFailed";
private static final String MANAGEMENT_API_BASE_URL = "https://manage.office.com/api/v1.0/";

Expand All @@ -56,6 +62,9 @@ public class Office365RestClient {
private final Counter auditLogRequestsFailedCounter;
private final Counter auditLogRequestsSuccessCounter;
private final Counter searchRequestsFailedCounter;
private final Counter searchRequestsSuccessCounter;
private final DistributionSummary auditLogResponseSizeSummary;
private final DistributionSummary searchResponseSizeSummary;

public Office365RestClient(final Office365AuthenticationInterface authConfig,
final PluginMetrics pluginMetrics) {
Expand All @@ -67,6 +76,9 @@ public Office365RestClient(final Office365AuthenticationInterface authConfig,
this.auditLogRequestsFailedCounter = pluginMetrics.counter(AUDIT_LOG_REQUESTS_FAILED);
this.auditLogRequestsSuccessCounter = pluginMetrics.counter(AUDIT_LOG_REQUESTS_SUCCESS);
this.searchRequestsFailedCounter = pluginMetrics.counter(SEARCH_REQUESTS_FAILED);
this.searchRequestsSuccessCounter = pluginMetrics.counter(SEARCH_REQUESTS_SUCCESS);
this.auditLogResponseSizeSummary = pluginMetrics.summary(AUDIT_LOG_RESPONSE_SIZE);
this.searchResponseSizeSummary = pluginMetrics.summary(SEARCH_RESPONSE_SIZE);
}

/**
Expand Down Expand Up @@ -170,6 +182,8 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
new HttpEntity<>(headers),
new ParameterizedTypeReference<>() {}
);
// Record search request size.
searchResponseSizeSummary.record(response.getHeaders().getContentLength());

// Extract NextPageUri from response headers
List<String> nextPageHeaders = response.getHeaders().get("NextPageUri");
Expand All @@ -180,6 +194,7 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
log.debug("Next page URI found: {}", nextPageUri);
}

searchRequestsSuccessCounter.increment();
return new AuditLogsResponse(response.getBody(), nextPageUri);
},
authConfig::renewCredentials
Expand Down Expand Up @@ -210,12 +225,21 @@ public String getAuditLog(String contentUri) {
try {
String response = RetryHandler.executeWithRetry(() -> {
headers.setBearerAuth(authConfig.getAccessToken());
return restTemplate.exchange(
contentUri,
HttpMethod.GET,
new HttpEntity<>(headers),
String.class
).getBody();

ResponseEntity<String> responseEntity = restTemplate.exchange(
contentUri,
HttpMethod.GET,
new HttpEntity<>(headers),
String.class
);

// Record audit log request size from response body
String responseBody = responseEntity.getBody();
if (responseBody != null) {
auditLogResponseSizeSummary.record(responseBody.getBytes(StandardCharsets.UTF_8).length);
}

return responseBody;
}, authConfig::renewCredentials);
auditLogRequestsSuccessCounter.increment();
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,4 @@ void testMissingWorkloadField() throws Exception {

verify(buffer).writeAll(argThat(list -> list.isEmpty()), anyInt());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

package org.opensearch.dataprepper.plugins.source.microsoft_office365;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -291,4 +294,147 @@ void testTokenRenewal() {
assertEquals("Bearer token-0", requestTokens.get(0), "First request should use token-0");
assertEquals("Bearer token-1", requestTokens.get(1), "Second request should use token-1");
}
}

@Test
void testMetricsInitialization() {
// Test metrics initialization during construction. This approach is used for metrics that are called
// inside RetryHandler.executeWithRetry() static method calls, which would require complex static mocking
// to test for invocation. Testing initialization ensures the metrics infrastructure is properly set up.

// Mock all required timers and counters for Office365RestClient constructor
PluginMetrics mockPluginMetrics = org.mockito.Mockito.mock(PluginMetrics.class);
Timer mockAuditLogFetchLatencyTimer = org.mockito.Mockito.mock(Timer.class);
Timer mockSearchCallLatencyTimer = org.mockito.Mockito.mock(Timer.class);
Counter mockAuditLogsRequestedCounter = org.mockito.Mockito.mock(Counter.class);
Counter mockAuditLogRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class);
Counter mockAuditLogRequestsSuccessCounter = org.mockito.Mockito.mock(Counter.class);
Counter mockSearchRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class);
Counter mockSearchRequestsSuccessCounter = org.mockito.Mockito.mock(Counter.class);
DistributionSummary mockAuditLogRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class);
DistributionSummary mockSearchRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class);

when(mockPluginMetrics.timer("auditLogFetchLatency")).thenReturn(mockAuditLogFetchLatencyTimer);
when(mockPluginMetrics.timer("searchCallLatency")).thenReturn(mockSearchCallLatencyTimer);
when(mockPluginMetrics.counter("auditLogsRequested")).thenReturn(mockAuditLogsRequestedCounter);
when(mockPluginMetrics.counter("auditLogRequestsFailed")).thenReturn(mockAuditLogRequestsFailedCounter);
when(mockPluginMetrics.counter("auditLogRequestsSuccess")).thenReturn(mockAuditLogRequestsSuccessCounter);
when(mockPluginMetrics.counter("searchRequestsFailed")).thenReturn(mockSearchRequestsFailedCounter);
when(mockPluginMetrics.counter("searchRequestsSuccess")).thenReturn(mockSearchRequestsSuccessCounter);
when(mockPluginMetrics.summary("auditLogRequestSizeBytes")).thenReturn(mockAuditLogRequestSizeSummary);
when(mockPluginMetrics.summary("searchRequestSizeBytes")).thenReturn(mockSearchRequestSizeSummary);

// Create Office365RestClient with mocked metrics
Office365RestClient testClient = new Office365RestClient(authConfig, mockPluginMetrics);

// Verify all metrics were requested during construction
verify(mockPluginMetrics).timer("auditLogFetchLatency");
verify(mockPluginMetrics).timer("searchCallLatency");
verify(mockPluginMetrics).counter("auditLogsRequested");
verify(mockPluginMetrics).counter("auditLogRequestsFailed");
verify(mockPluginMetrics).counter("auditLogRequestsSuccess");
verify(mockPluginMetrics).counter("searchRequestsFailed");
verify(mockPluginMetrics).counter("searchRequestsSuccess");
verify(mockPluginMetrics).summary("auditLogRequestSizeBytes");
verify(mockPluginMetrics).summary("searchRequestSizeBytes");
}

@Test
void testGetAuditLogMetricsInvocation() throws NoSuchFieldException, IllegalAccessException {
// Test metrics for getAuditLog() method - both success and failure scenarios

// Create mock metrics and inject them
Counter mockAuditLogsRequestedCounter = org.mockito.Mockito.mock(Counter.class);
Counter mockAuditLogRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class);
Timer mockAuditLogFetchLatencyTimer = org.mockito.Mockito.mock(Timer.class);
DistributionSummary mockAuditLogRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class);

ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "auditLogsRequestedCounter", mockAuditLogsRequestedCounter);
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "auditLogRequestsFailedCounter", mockAuditLogRequestsFailedCounter);
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "auditLogFetchLatencyTimer", mockAuditLogFetchLatencyTimer);
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "auditLogRequestSizeSummary", mockAuditLogRequestSizeSummary);

// Mock timer.record() to execute the lambda
when(mockAuditLogFetchLatencyTimer.record(any(java.util.function.Supplier.class))).thenAnswer(invocation -> {
java.util.function.Supplier<?> supplier = invocation.getArgument(0);
return supplier.get();
});

String contentUri = "https://manage.office.com/api/v1.0/test-tenant/activity/feed/audit/123";

// Test success scenario
String mockAuditLog = "{\"id\":\"123\",\"contentType\":\"Audit.AzureActiveDirectory\"}";
ResponseEntity<String> mockResponse = new ResponseEntity<>(mockAuditLog, HttpStatus.OK);
when(restTemplate.exchange(eq(contentUri), eq(HttpMethod.GET), any(), eq(String.class)))
.thenReturn(mockResponse);

office365RestClient.getAuditLog(contentUri);

// Verify success metrics
verify(mockAuditLogsRequestedCounter).increment(); // Called directly before RetryHandler
verify(mockAuditLogFetchLatencyTimer).record(any(java.util.function.Supplier.class)); // Timer wrapper
verify(mockAuditLogRequestSizeSummary).record(mockAuditLog.getBytes(java.nio.charset.StandardCharsets.UTF_8).length); // Size metric inside RetryHandler

// Test failure scenario
when(restTemplate.exchange(eq(contentUri), eq(HttpMethod.GET), any(), eq(String.class)))
.thenThrow(new HttpClientErrorException(HttpStatus.INTERNAL_SERVER_ERROR));

assertThrows(RuntimeException.class, () -> office365RestClient.getAuditLog(contentUri));

// Verify failure metrics
verify(mockAuditLogsRequestedCounter, times(2)).increment(); // Called again before retry
verify(mockAuditLogRequestsFailedCounter).increment(); // Called in catch block outside RetryHandler
}

@Test
void testSearchAuditLogsMetricsInvocation() throws NoSuchFieldException, IllegalAccessException {
// Test metrics for searchAuditLogs() method - both success and failure scenarios

// Create mock metrics and inject them
Counter mockSearchRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class);
Timer mockSearchCallLatencyTimer = org.mockito.Mockito.mock(Timer.class);
DistributionSummary mockSearchRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class);

ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "searchRequestsFailedCounter", mockSearchRequestsFailedCounter);
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "searchCallLatencyTimer", mockSearchCallLatencyTimer);
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "searchRequestSizeSummary", mockSearchRequestSizeSummary);

// Mock timer.record() to execute the lambda
when(mockSearchCallLatencyTimer.record(any(java.util.function.Supplier.class))).thenAnswer(invocation -> {
java.util.function.Supplier<?> supplier = invocation.getArgument(0);
return supplier.get();
});

// Test success scenario
List<Map<String, Object>> mockResults = Collections.singletonList(new HashMap<>());
HttpHeaders responseHeaders = new HttpHeaders();
responseHeaders.setContentLength(1024L); // Mock content length
ResponseEntity<List<Map<String, Object>>> mockResponse = new ResponseEntity<>(mockResults, responseHeaders, HttpStatus.OK);
when(restTemplate.exchange(anyString(), eq(HttpMethod.GET), any(), any(ParameterizedTypeReference.class)))
.thenReturn(mockResponse);

office365RestClient.searchAuditLogs(
"Audit.AzureActiveDirectory",
Instant.now().minus(1, ChronoUnit.HOURS),
Instant.now(),
null
);

// Verify success metrics
verify(mockSearchCallLatencyTimer).record(any(java.util.function.Supplier.class)); // Timer wrapper
verify(mockSearchRequestSizeSummary).record(1024L); // Size metric inside RetryHandler

// Test failure scenario
when(restTemplate.exchange(anyString(), eq(HttpMethod.GET), any(), any(ParameterizedTypeReference.class)))
.thenThrow(new HttpClientErrorException(HttpStatus.INTERNAL_SERVER_ERROR));

assertThrows(RuntimeException.class, () -> office365RestClient.searchAuditLogs(
"Audit.AzureActiveDirectory",
Instant.now().minus(1, ChronoUnit.HOURS),
Instant.now(),
null
));

// Verify failure metrics
verify(mockSearchRequestsFailedCounter).increment(); // Called in catch block outside RetryHandler
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ public class TokenPaginationCrawler implements Crawler<PaginationCrawlerWorkerPr
private static final int batchSize = 50;
private static final String PAGINATION_WORKER_PARTITIONS_CREATED = "paginationWorkerPartitionsCreated";
private static final String INVALID_PAGINATION_ITEMS = "invalidPaginationItems";
private static final String WORKER_PARTITION_WAIT_TIME = "WorkerPartitionWaitTime";
private static final String WORKER_PARTITION_PROCESS_LATENCY = "WorkerPartitionProcessLatency";
private final Timer crawlingTimer;
private final Timer partitionWaitTimeTimer;
private final Timer partitionProcessLatencyTimer;
private final CrawlerClient client;
private final Counter parititionsCreatedCounter;
private final Counter invalidPaginationItemsCounter;
Expand All @@ -43,6 +47,8 @@ public TokenPaginationCrawler(CrawlerClient client,
this.client = client;
this.crawlingTimer = pluginMetrics.timer("crawlingTime");
this.parititionsCreatedCounter = pluginMetrics.counter(PAGINATION_WORKER_PARTITIONS_CREATED);
this.partitionWaitTimeTimer = pluginMetrics.timer(WORKER_PARTITION_WAIT_TIME);
this.partitionProcessLatencyTimer = pluginMetrics.timer(WORKER_PARTITION_PROCESS_LATENCY);
this.invalidPaginationItemsCounter = pluginMetrics.counter(INVALID_PAGINATION_ITEMS);

}
Expand Down Expand Up @@ -92,7 +98,8 @@ public Instant crawl(LeaderPartition leaderPartition,
}

public void executePartition(PaginationCrawlerWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {
client.executePartition(state, buffer, acknowledgementSet);
partitionWaitTimeTimer.record(Duration.between(state.getExportStartTime(), Instant.now()));
partitionProcessLatencyTimer.record(() -> client.executePartition(state, buffer, acknowledgementSet));
}

private void updateLeaderProgressState(LeaderPartition leaderPartition, String updatedToken, EnhancedSourceCoordinator coordinator) {
Expand Down
Loading