diff --git a/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java b/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java
index 7f2c4905c9a73..020586c2d6d15 100644
--- a/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java
+++ b/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java
@@ -19,9 +19,11 @@
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
/**
*
An interface for enforcing a policy on alter configs requests.
@@ -42,6 +44,7 @@ class RequestMetadata {
private final ConfigResource resource;
private final Map configs;
+ private final KafkaPrincipal principal;
/**
* Create an instance of this class with the provided parameters.
@@ -49,8 +52,23 @@ class RequestMetadata {
* This constructor is public to make testing of AlterConfigPolicy implementations easier.
*/
public RequestMetadata(ConfigResource resource, Map configs) {
+ this(resource, configs, null);
+ }
+
+ /**
+ * Create an instance of this class with the provided parameters, including the principal that initiated the
+ * request.
+ *
+ * This constructor is public to make testing of AlterConfigPolicy implementations easier.
+ *
+ * @param resource the resource whose configs are being altered.
+ * @param configs the configs in the request.
+ * @param principal the authenticated principal that initiated the request, or null if not available.
+ */
+ public RequestMetadata(ConfigResource resource, Map configs, KafkaPrincipal principal) {
this.resource = resource;
this.configs = configs;
+ this.principal = principal;
}
/**
@@ -64,6 +82,16 @@ public ConfigResource resource() {
return resource;
}
+ /**
+ * Return the authenticated principal that initiated the request, if available. This may be empty when the
+ * metadata is constructed without a principal (for example, in tests or via the legacy constructor).
+ */
+ public Optional principal() {
+ return Optional.ofNullable(principal);
+ }
+
+ // The principal is intentionally excluded from equals/hashCode: it is request-scoped metadata about who
+ // initiated the request, not part of the identity of the requested config change.
@Override
public int hashCode() {
return Objects.hash(resource, configs);
@@ -80,7 +108,8 @@ public boolean equals(Object o) {
@Override
public String toString() {
return "AlterConfigPolicy.RequestMetadata(resource=" + resource +
- ", configs=" + configs + ")";
+ ", configs=" + configs +
+ ", principal=" + principal + ")";
}
}
diff --git a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
index 4d8c7ad2706c9..e4f0624fc14cf 100644
--- a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
+++ b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
@@ -18,11 +18,13 @@
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
/**
* An interface for enforcing a policy on create topics requests.
@@ -45,6 +47,7 @@ class RequestMetadata {
private final Short replicationFactor;
private final Map> replicasAssignments;
private final Map configs;
+ private final KafkaPrincipal principal;
/**
* Create an instance of this class with the provided parameters.
@@ -61,11 +64,33 @@ class RequestMetadata {
*/
public RequestMetadata(String topic, Integer numPartitions, Short replicationFactor,
Map> replicasAssignments, Map configs) {
+ this(topic, numPartitions, replicationFactor, replicasAssignments, configs, null);
+ }
+
+ /**
+ * Create an instance of this class with the provided parameters, including the principal that initiated the
+ * request.
+ *
+ * This constructor is public to make testing of CreateTopicPolicy implementations easier.
+ *
+ * @param topic the name of the topic to create.
+ * @param numPartitions the number of partitions to create or null if replicasAssignments is set.
+ * @param replicationFactor the replication factor for the topic or null if replicaAssignments is set.
+ * @param replicasAssignments replica assignments or null if numPartitions and replicationFactor is set. The
+ * assignment is a map from partition id to replica (broker) ids.
+ * @param configs topic configs for the topic to be created, not including broker defaults. Broker configs are
+ * passed via the {@code configure()} method of the policy implementation.
+ * @param principal the authenticated principal that initiated the request, or null if not available.
+ */
+ public RequestMetadata(String topic, Integer numPartitions, Short replicationFactor,
+ Map> replicasAssignments, Map configs,
+ KafkaPrincipal principal) {
this.topic = topic;
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
this.replicasAssignments = replicasAssignments == null ? null : Collections.unmodifiableMap(replicasAssignments);
this.configs = Collections.unmodifiableMap(configs);
+ this.principal = principal;
}
/**
@@ -105,6 +130,16 @@ public Map configs() {
return configs;
}
+ /**
+ * Return the authenticated principal that initiated the request, if available. This may be empty when the
+ * metadata is constructed without a principal (for example, in tests or via the legacy constructor).
+ */
+ public Optional principal() {
+ return Optional.ofNullable(principal);
+ }
+
+ // The principal is intentionally excluded from equals/hashCode: it is request-scoped metadata about who
+ // initiated the request, not part of the identity of the requested topic.
@Override
public int hashCode() {
return Objects.hash(topic, numPartitions, replicationFactor,
@@ -129,7 +164,8 @@ public String toString() {
", numPartitions=" + numPartitions +
", replicationFactor=" + replicationFactor +
", replicasAssignments=" + replicasAssignments +
- ", configs=" + configs + ")";
+ ", configs=" + configs +
+ ", principal=" + principal + ")";
}
}
diff --git a/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigPolicyTest.java b/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigPolicyTest.java
index 5a6d8b291b0ba..88de4a9da06d8 100644
--- a/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigPolicyTest.java
+++ b/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigPolicyTest.java
@@ -18,14 +18,17 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
import org.junit.jupiter.api.Test;
import java.util.Collections;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class AlterConfigPolicyTest {
@@ -49,4 +52,51 @@ public void testRequestMetadataEquals() {
Collections.emptyMap()
));
}
+
+ @Test
+ public void testPrincipalDefaultsToEmpty() {
+ RequestMetadata requestMetadata = new RequestMetadata(
+ new ConfigResource(Type.BROKER, "0"),
+ Collections.singletonMap("foo", "bar")
+ );
+ assertEquals(Optional.empty(), requestMetadata.principal());
+ }
+
+ @Test
+ public void testPrincipalIsExposed() {
+ KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice");
+ RequestMetadata requestMetadata = new RequestMetadata(
+ new ConfigResource(Type.BROKER, "0"),
+ Collections.singletonMap("foo", "bar"),
+ principal
+ );
+ assertEquals(Optional.of(principal), requestMetadata.principal());
+ }
+
+ @Test
+ public void testPrincipalExcludedFromEqualsAndHashCode() {
+ RequestMetadata withoutPrincipal = new RequestMetadata(
+ new ConfigResource(Type.BROKER, "0"),
+ Collections.singletonMap("foo", "bar")
+ );
+ RequestMetadata withPrincipal = new RequestMetadata(
+ new ConfigResource(Type.BROKER, "0"),
+ Collections.singletonMap("foo", "bar"),
+ new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice")
+ );
+ // The principal is request-scoped metadata and must not affect equality of the requested change.
+ assertEquals(withoutPrincipal, withPrincipal);
+ assertEquals(withoutPrincipal.hashCode(), withPrincipal.hashCode());
+ }
+
+ @Test
+ public void testToStringIncludesPrincipal() {
+ RequestMetadata requestMetadata = new RequestMetadata(
+ new ConfigResource(Type.BROKER, "0"),
+ Collections.singletonMap("foo", "bar"),
+ new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice")
+ );
+ assertTrue(requestMetadata.toString().contains("principal=User:alice"),
+ "Expected toString to contain the principal, but was: " + requestMetadata);
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/server/policy/CreateTopicPolicyTest.java b/clients/src/test/java/org/apache/kafka/server/policy/CreateTopicPolicyTest.java
new file mode 100644
index 0000000000000..35b7058c18ce9
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/server/policy/CreateTopicPolicyTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.policy;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class CreateTopicPolicyTest {
+
+ @Test
+ public void testPrincipalDefaultsToEmpty() {
+ RequestMetadata requestMetadata = new RequestMetadata(
+ "topic", 1, (short) 1, null, Map.of());
+ assertEquals(Optional.empty(), requestMetadata.principal());
+ }
+
+ @Test
+ public void testPrincipalIsExposed() {
+ KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice");
+ RequestMetadata requestMetadata = new RequestMetadata(
+ "topic", 1, (short) 1, null, Map.of(), principal);
+ assertEquals(Optional.of(principal), requestMetadata.principal());
+ }
+
+ @Test
+ public void testPrincipalExcludedFromEqualsAndHashCode() {
+ RequestMetadata withoutPrincipal = new RequestMetadata(
+ "topic", 1, (short) 1, null, Map.of());
+ RequestMetadata withPrincipal = new RequestMetadata(
+ "topic", 1, (short) 1, null, Map.of(),
+ new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice"));
+ // The principal is request-scoped metadata and must not affect equality of the requested topic.
+ assertEquals(withoutPrincipal, withPrincipal);
+ assertEquals(withoutPrincipal.hashCode(), withPrincipal.hashCode());
+ }
+
+ @Test
+ public void testToStringIncludesPrincipal() {
+ RequestMetadata requestMetadata = new RequestMetadata(
+ "topic", 1, (short) 1, null, Map.of(),
+ new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice"));
+ assertTrue(requestMetadata.toString().contains("principal=User:alice"),
+ "Expected toString to contain the principal, but was: " + requestMetadata);
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 3ff214dfc70e2..e843df3ce4221 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -29,6 +29,7 @@
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.internals.LogContext;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.SupportedConfigChecker;
@@ -214,7 +215,8 @@ SnapshotRegistry snapshotRegistry() {
ControllerResult