diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java index 63311fc035..1fd95777c4 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java @@ -19,6 +19,7 @@ package org.apache.polaris.service.events.jsonEventListener; +import jakarta.annotation.Nullable; import java.util.HashMap; import org.apache.polaris.service.events.IcebergRestCatalogEvents; import org.apache.polaris.service.events.listeners.PolarisEventListener; @@ -33,6 +34,9 @@ public abstract class PropertyMapEventListener implements PolarisEventListener { protected abstract void transformAndSendEvent(HashMap properties); + @Nullable + protected abstract String getRequestId(); + @Override public void onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) { HashMap properties = new HashMap<>(); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java index 87cf70a2f1..10be34e2e0 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java @@ -19,13 +19,17 @@ package org.apache.polaris.service.events.jsonEventListener.aws.cloudwatch; +import static org.apache.polaris.service.tracing.RequestIdFilter.REQUEST_ID_KEY; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.Nullable; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.SecurityContext; import java.time.Clock; @@ -67,6 +71,8 @@ public class AwsCloudWatchEventListener extends PropertyMapEventListener { @Context SecurityContext securityContext; + @Context ContainerRequestContext requestContext; + @Inject public AwsCloudWatchEventListener( AwsCloudWatchConfiguration config, @@ -143,6 +149,12 @@ private static void ensureResourceExists( } } + @Nullable + @Override + protected String getRequestId() { + return (String) requestContext.getProperty(REQUEST_ID_KEY); + } + @PreDestroy void shutdown() { if (client != null) { @@ -157,7 +169,10 @@ protected void transformAndSendEvent(HashMap properties) { properties.put("principal", securityContext.getUserPrincipal().getName()); properties.put( "activated_roles", ((PolarisPrincipal) securityContext.getUserPrincipal()).getRoles()); - // TODO: Add request ID when it is available + String requestId = getRequestId(); + if (requestId != null) { + properties.put("request_id", requestId); + } String eventAsJson; try { eventAsJson = objectMapper.writeValueAsString(properties); diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java b/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java index 3aac097b17..d29aeb731c 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java @@ -20,11 +20,13 @@ package org.apache.polaris.service.events.jsonEventListener.aws.cloudwatch; import static org.apache.polaris.containerspec.ContainerSpecHelper.containerSpecHelper; +import static org.apache.polaris.service.tracing.RequestIdFilter.REQUEST_ID_KEY; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.PropertyNamingStrategies; import io.quarkus.runtime.configuration.MemorySize; +import jakarta.ws.rs.container.ContainerRequestContext; import jakarta.ws.rs.core.SecurityContext; import java.math.BigInteger; import java.time.Clock; @@ -77,6 +79,7 @@ class AwsCloudWatchEventListenerTest { private static final String LOG_STREAM = "test-log-stream"; private static final String REALM = "test-realm"; private static final String TEST_USER = "test-user"; + private static final String TEST_REQUEST_ID = "test-request-id-12345"; private static final Clock clock = Clock.systemUTC(); private static final BigInteger MAX_BODY_SIZE = BigInteger.valueOf(1024 * 1024); private static final PolarisIcebergObjectMapperCustomizer customizer = @@ -143,14 +146,17 @@ protected CloudWatchLogsAsyncClient createCloudWatchAsyncClient() { RealmContext realmContext = Mockito.mock(RealmContext.class); SecurityContext securityContext = Mockito.mock(SecurityContext.class); PolarisPrincipal principal = Mockito.mock(PolarisPrincipal.class); + ContainerRequestContext requestContext = Mockito.mock(ContainerRequestContext.class); when(callContext.getRealmContext()).thenReturn(realmContext); when(callContext.getPolarisCallContext()).thenReturn(polarisCallContext); when(realmContext.getRealmIdentifier()).thenReturn(REALM); when(securityContext.getUserPrincipal()).thenReturn(principal); when(principal.getName()).thenReturn(TEST_USER); when(principal.getRoles()).thenReturn(Set.of("role1", "role2")); + when(requestContext.getProperty(REQUEST_ID_KEY)).thenReturn(TEST_REQUEST_ID); listener.callContext = callContext; listener.securityContext = securityContext; + listener.requestContext = requestContext; return listener; } @@ -242,6 +248,7 @@ void shouldSendEventToCloudWatch() { IcebergRestCatalogEvents.AfterRefreshTableEvent.class.getSimpleName()); assertThat(message).contains(TEST_USER); assertThat(message).contains(testTable.toString()); + assertThat(message).contains(TEST_REQUEST_ID); }); } finally { // Clean up