From 34ef3670079d6f3d0182d5ec0aed66d7f1dcecbe Mon Sep 17 00:00:00 2001 From: Dan O'Reilly Date: Mon, 15 Jun 2026 17:02:45 -0700 Subject: [PATCH] KAFKA-15701: Expose the requesting principal to CreateTopicPolicy and AlterConfigPolicy The CreateTopicPolicy and AlterConfigPolicy plugin interfaces let operators constrain the *shape* of a request (topic name, partitions, replication factor, configs) but not *who* issued it. In multi-tenant clusters this prevents a policy from enforcing per-principal rules that prefixed ACLs cannot express, e.g. "tenant-a may only create topics with replication.factor <= 3" or per-principal topic/partition quotas. This change exposes the authenticated KafkaPrincipal to both policies: * CreateTopicPolicy.RequestMetadata and AlterConfigPolicy.RequestMetadata gain an overloaded constructor that accepts a KafkaPrincipal and an Optional principal() accessor. The existing constructors are retained and delegate with a null principal, so the change is source- and binary-compatible for existing policy implementations. The principal is excluded from equals()/hashCode() (it is request-scoped metadata, not part of the identity of the requested change) and included in toString(). * The KRaft controller populates the principal from ControllerRequestContext.principal(): ReplicationControlManager passes it to CreateTopicPolicy, and it is threaded through ConfigurationControlManager (alongside the existing 'forwarded' parameter) to AlterConfigPolicy for both the incremental and legacy alter-configs paths. The legacy ZooKeeper path is intentionally out of scope. Policies constructed without a principal (including via the legacy constructors) observe Optional.empty(). NOTE: This is a public-interface change and therefore requires a KIP before it can be merged upstream. This commit is the implementation/proof-of-concept to support that discussion; see KAFKA-15701. --- .../server/policy/AlterConfigPolicy.java | 31 +++++- .../server/policy/CreateTopicPolicy.java | 38 ++++++- .../server/policy/AlterConfigPolicyTest.java | 50 +++++++++ .../server/policy/CreateTopicPolicyTest.java | 67 ++++++++++++ .../ConfigurationControlManager.java | 34 +++--- .../kafka/controller/QuorumController.java | 4 +- .../controller/ReplicationControlManager.java | 6 +- .../ConfigurationControlManagerTest.java | 102 ++++++++++++++---- .../controller/QuorumControllerTest.java | 4 +- .../ReplicationControlManagerTest.java | 48 ++++++++- 10 files changed, 337 insertions(+), 47 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/server/policy/CreateTopicPolicyTest.java diff --git a/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java b/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java index 7f2c4905c9a73..020586c2d6d15 100644 --- a/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java +++ b/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java @@ -19,9 +19,11 @@ import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.PolicyViolationException; +import org.apache.kafka.common.security.auth.KafkaPrincipal; import java.util.Map; import java.util.Objects; +import java.util.Optional; /** *

An interface for enforcing a policy on alter configs requests. @@ -42,6 +44,7 @@ class RequestMetadata { private final ConfigResource resource; private final Map configs; + private final KafkaPrincipal principal; /** * Create an instance of this class with the provided parameters. @@ -49,8 +52,23 @@ class RequestMetadata { * This constructor is public to make testing of AlterConfigPolicy implementations easier. */ public RequestMetadata(ConfigResource resource, Map configs) { + this(resource, configs, null); + } + + /** + * Create an instance of this class with the provided parameters, including the principal that initiated the + * request. + * + * This constructor is public to make testing of AlterConfigPolicy implementations easier. + * + * @param resource the resource whose configs are being altered. + * @param configs the configs in the request. + * @param principal the authenticated principal that initiated the request, or null if not available. + */ + public RequestMetadata(ConfigResource resource, Map configs, KafkaPrincipal principal) { this.resource = resource; this.configs = configs; + this.principal = principal; } /** @@ -64,6 +82,16 @@ public ConfigResource resource() { return resource; } + /** + * Return the authenticated principal that initiated the request, if available. This may be empty when the + * metadata is constructed without a principal (for example, in tests or via the legacy constructor). + */ + public Optional principal() { + return Optional.ofNullable(principal); + } + + // The principal is intentionally excluded from equals/hashCode: it is request-scoped metadata about who + // initiated the request, not part of the identity of the requested config change. @Override public int hashCode() { return Objects.hash(resource, configs); @@ -80,7 +108,8 @@ public boolean equals(Object o) { @Override public String toString() { return "AlterConfigPolicy.RequestMetadata(resource=" + resource + - ", configs=" + configs + ")"; + ", configs=" + configs + + ", principal=" + principal + ")"; } } diff --git a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java index 4d8c7ad2706c9..e4f0624fc14cf 100644 --- a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java +++ b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java @@ -18,11 +18,13 @@ import org.apache.kafka.common.Configurable; import org.apache.kafka.common.errors.PolicyViolationException; +import org.apache.kafka.common.security.auth.KafkaPrincipal; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; /** *

An interface for enforcing a policy on create topics requests. @@ -45,6 +47,7 @@ class RequestMetadata { private final Short replicationFactor; private final Map> replicasAssignments; private final Map configs; + private final KafkaPrincipal principal; /** * Create an instance of this class with the provided parameters. @@ -61,11 +64,33 @@ class RequestMetadata { */ public RequestMetadata(String topic, Integer numPartitions, Short replicationFactor, Map> replicasAssignments, Map configs) { + this(topic, numPartitions, replicationFactor, replicasAssignments, configs, null); + } + + /** + * Create an instance of this class with the provided parameters, including the principal that initiated the + * request. + * + * This constructor is public to make testing of CreateTopicPolicy implementations easier. + * + * @param topic the name of the topic to create. + * @param numPartitions the number of partitions to create or null if replicasAssignments is set. + * @param replicationFactor the replication factor for the topic or null if replicaAssignments is set. + * @param replicasAssignments replica assignments or null if numPartitions and replicationFactor is set. The + * assignment is a map from partition id to replica (broker) ids. + * @param configs topic configs for the topic to be created, not including broker defaults. Broker configs are + * passed via the {@code configure()} method of the policy implementation. + * @param principal the authenticated principal that initiated the request, or null if not available. + */ + public RequestMetadata(String topic, Integer numPartitions, Short replicationFactor, + Map> replicasAssignments, Map configs, + KafkaPrincipal principal) { this.topic = topic; this.numPartitions = numPartitions; this.replicationFactor = replicationFactor; this.replicasAssignments = replicasAssignments == null ? null : Collections.unmodifiableMap(replicasAssignments); this.configs = Collections.unmodifiableMap(configs); + this.principal = principal; } /** @@ -105,6 +130,16 @@ public Map configs() { return configs; } + /** + * Return the authenticated principal that initiated the request, if available. This may be empty when the + * metadata is constructed without a principal (for example, in tests or via the legacy constructor). + */ + public Optional principal() { + return Optional.ofNullable(principal); + } + + // The principal is intentionally excluded from equals/hashCode: it is request-scoped metadata about who + // initiated the request, not part of the identity of the requested topic. @Override public int hashCode() { return Objects.hash(topic, numPartitions, replicationFactor, @@ -129,7 +164,8 @@ public String toString() { ", numPartitions=" + numPartitions + ", replicationFactor=" + replicationFactor + ", replicasAssignments=" + replicasAssignments + - ", configs=" + configs + ")"; + ", configs=" + configs + + ", principal=" + principal + ")"; } } diff --git a/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigPolicyTest.java b/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigPolicyTest.java index 5a6d8b291b0ba..88de4a9da06d8 100644 --- a/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigPolicyTest.java +++ b/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigPolicyTest.java @@ -18,14 +18,17 @@ import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.ConfigResource.Type; +import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata; import org.junit.jupiter.api.Test; import java.util.Collections; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class AlterConfigPolicyTest { @@ -49,4 +52,51 @@ public void testRequestMetadataEquals() { Collections.emptyMap() )); } + + @Test + public void testPrincipalDefaultsToEmpty() { + RequestMetadata requestMetadata = new RequestMetadata( + new ConfigResource(Type.BROKER, "0"), + Collections.singletonMap("foo", "bar") + ); + assertEquals(Optional.empty(), requestMetadata.principal()); + } + + @Test + public void testPrincipalIsExposed() { + KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice"); + RequestMetadata requestMetadata = new RequestMetadata( + new ConfigResource(Type.BROKER, "0"), + Collections.singletonMap("foo", "bar"), + principal + ); + assertEquals(Optional.of(principal), requestMetadata.principal()); + } + + @Test + public void testPrincipalExcludedFromEqualsAndHashCode() { + RequestMetadata withoutPrincipal = new RequestMetadata( + new ConfigResource(Type.BROKER, "0"), + Collections.singletonMap("foo", "bar") + ); + RequestMetadata withPrincipal = new RequestMetadata( + new ConfigResource(Type.BROKER, "0"), + Collections.singletonMap("foo", "bar"), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice") + ); + // The principal is request-scoped metadata and must not affect equality of the requested change. + assertEquals(withoutPrincipal, withPrincipal); + assertEquals(withoutPrincipal.hashCode(), withPrincipal.hashCode()); + } + + @Test + public void testToStringIncludesPrincipal() { + RequestMetadata requestMetadata = new RequestMetadata( + new ConfigResource(Type.BROKER, "0"), + Collections.singletonMap("foo", "bar"), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice") + ); + assertTrue(requestMetadata.toString().contains("principal=User:alice"), + "Expected toString to contain the principal, but was: " + requestMetadata); + } } diff --git a/clients/src/test/java/org/apache/kafka/server/policy/CreateTopicPolicyTest.java b/clients/src/test/java/org/apache/kafka/server/policy/CreateTopicPolicyTest.java new file mode 100644 index 0000000000000..35b7058c18ce9 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/server/policy/CreateTopicPolicyTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.policy; + +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata; + +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class CreateTopicPolicyTest { + + @Test + public void testPrincipalDefaultsToEmpty() { + RequestMetadata requestMetadata = new RequestMetadata( + "topic", 1, (short) 1, null, Map.of()); + assertEquals(Optional.empty(), requestMetadata.principal()); + } + + @Test + public void testPrincipalIsExposed() { + KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice"); + RequestMetadata requestMetadata = new RequestMetadata( + "topic", 1, (short) 1, null, Map.of(), principal); + assertEquals(Optional.of(principal), requestMetadata.principal()); + } + + @Test + public void testPrincipalExcludedFromEqualsAndHashCode() { + RequestMetadata withoutPrincipal = new RequestMetadata( + "topic", 1, (short) 1, null, Map.of()); + RequestMetadata withPrincipal = new RequestMetadata( + "topic", 1, (short) 1, null, Map.of(), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice")); + // The principal is request-scoped metadata and must not affect equality of the requested topic. + assertEquals(withoutPrincipal, withPrincipal); + assertEquals(withoutPrincipal.hashCode(), withPrincipal.hashCode()); + } + + @Test + public void testToStringIncludesPrincipal() { + RequestMetadata requestMetadata = new RequestMetadata( + "topic", 1, (short) 1, null, Map.of(), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice")); + assertTrue(requestMetadata.toString().contains("principal=User:alice"), + "Expected toString to contain the principal, but was: " + requestMetadata); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 3ff214dfc70e2..e843df3ce4221 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.utils.internals.LogContext; import org.apache.kafka.metadata.KafkaConfigSchema; import org.apache.kafka.metadata.SupportedConfigChecker; @@ -214,7 +215,8 @@ SnapshotRegistry snapshotRegistry() { ControllerResult> incrementalAlterConfigs( Map>> configChanges, boolean newlyCreatedResource, - boolean forwarded + boolean forwarded, + KafkaPrincipal principal ) { List outputRecords = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); @@ -225,7 +227,8 @@ ControllerResult> incrementalAlterConfigs( resourceEntry.getValue(), newlyCreatedResource, outputRecords, - forwarded); + forwarded, + principal); outputResults.put(resourceEntry.getKey(), apiError); } outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords)); @@ -258,7 +261,8 @@ ControllerResult incrementalAlterConfig( ConfigResource configResource, Map> keyToOps, boolean newlyCreatedResource, - boolean forwarded + boolean forwarded, + KafkaPrincipal principal ) { List outputRecords = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); @@ -266,7 +270,8 @@ ControllerResult incrementalAlterConfig( keyToOps, newlyCreatedResource, outputRecords, - forwarded); + forwarded, + principal); outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords)); return ControllerResult.atomicOf(outputRecords, apiError); @@ -277,7 +282,8 @@ private ApiError incrementalAlterConfigResource( Map> keysToOps, boolean newlyCreatedResource, List outputRecords, - boolean forwarded + boolean forwarded, + KafkaPrincipal principal ) { List newRecords = new ArrayList<>(); for (Entry> keysToOpsEntry : keysToOps.entrySet()) { @@ -329,7 +335,7 @@ private ApiError incrementalAlterConfigResource( setValue(newValue), (short) 0)); } } - ApiError error = validateAlterConfig(configResource, newRecords, List.of(), newlyCreatedResource, forwarded); + ApiError error = validateAlterConfig(configResource, newRecords, List.of(), newlyCreatedResource, forwarded, principal); if (error.isFailure()) { return error; } @@ -342,7 +348,8 @@ private ApiError validateAlterConfig( List recordsExplicitlyAltered, List recordsImplicitlyDeleted, boolean newlyCreatedResource, - boolean forwarded + boolean forwarded, + KafkaPrincipal principal ) { Map allConfigs = new HashMap<>(); Map existingConfigsMap = new HashMap<>(); @@ -394,7 +401,7 @@ private ApiError validateAlterConfig( if (!newlyCreatedResource) { existenceChecker.accept(configResource); } - alterConfigPolicy.ifPresent(policy -> policy.validate(new RequestMetadata(configResource, alteredConfigsForAlterConfigPolicyCheck))); + alterConfigPolicy.ifPresent(policy -> policy.validate(new RequestMetadata(configResource, alteredConfigsForAlterConfigPolicyCheck, principal))); } catch (ConfigException e) { return new ApiError(INVALID_CONFIG, e.getMessage()); } catch (Throwable e) { @@ -497,7 +504,8 @@ boolean isDisallowedClusterMinIsrTransition(ConfigRecord configRecord) { ControllerResult> legacyAlterConfigs( Map> newConfigs, boolean newlyCreatedResource, - boolean forwarded + boolean forwarded, + KafkaPrincipal principal ) { List outputRecords = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); @@ -509,7 +517,8 @@ ControllerResult> legacyAlterConfigs( newlyCreatedResource, outputRecords, outputResults, - forwarded); + forwarded, + principal); } outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords)); return ControllerResult.atomicOf(outputRecords, outputResults); @@ -520,7 +529,8 @@ private void legacyAlterConfigResource(ConfigResource configResource, boolean newlyCreatedResource, List outputRecords, Map outputResults, - boolean forwarded) { + boolean forwarded, + KafkaPrincipal principal) { List recordsExplicitlyAltered = new ArrayList<>(); Map currentConfigs = configData.get(configResource); if (currentConfigs == null) { @@ -549,7 +559,7 @@ private void legacyAlterConfigResource(ConfigResource configResource, setValue(null), (short) 0)); } } - ApiError error = validateAlterConfig(configResource, recordsExplicitlyAltered, recordsImplicitlyDeleted, newlyCreatedResource, forwarded); + ApiError error = validateAlterConfig(configResource, recordsExplicitlyAltered, recordsImplicitlyDeleted, newlyCreatedResource, forwarded, principal); if (error.isFailure()) { outputResults.put(configResource, error); return; diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 5da6eb2e3aa0b..9542e469450a5 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1918,7 +1918,7 @@ public CompletableFuture> incrementalAlterConfigs( } return appendWriteEvent("incrementalAlterConfigs", context.deadlineNs(), () -> { ControllerResult> result = - configurationControl.incrementalAlterConfigs(configChanges, false, forwarded); + configurationControl.incrementalAlterConfigs(configChanges, false, forwarded, context.principal()); if (validateOnly) { return result.withoutRecords(); } else { @@ -1965,7 +1965,7 @@ public CompletableFuture> legacyAlterConfigs( } return appendWriteEvent("legacyAlterConfigs", context.deadlineNs(), () -> { ControllerResult> result = - configurationControl.legacyAlterConfigs(newConfigs, false, forwarded); + configurationControl.legacyAlterConfigs(newConfigs, false, forwarded, context.principal()); if (validateOnly) { return result.withoutRecords(); } else { diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index ae1854f3cedbb..60863918509f3 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -658,7 +658,7 @@ ControllerResult createTopics( List configRecords; if (keyToOps != null) { ControllerResult configResult = - configurationControl.incrementalAlterConfig(configResource, keyToOps, true, forwarded); + configurationControl.incrementalAlterConfig(configResource, keyToOps, true, forwarded, context.principal()); if (configResult.response().isFailure()) { topicErrors.put(topic.name(), configResult.response()); continue; @@ -761,7 +761,7 @@ private ApiError createTopic(ControllerRequestContext context, Map> assignments = new HashMap<>(); newParts.forEach((key, value) -> assignments.put(key, Replicas.toList(value.replicas))); return new CreateTopicPolicy.RequestMetadata( - topic.name(), null, null, assignments, creationConfigs); + topic.name(), null, null, assignments, creationConfigs, context.principal()); }); if (error.isFailure()) return error; } else if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) { @@ -804,7 +804,7 @@ private ApiError createTopic(ControllerRequestContext context, " time(s): " + e.getMessage()); } ApiError error = maybeCheckCreateTopicPolicy(() -> new CreateTopicPolicy.RequestMetadata( - topic.name(), numPartitions, replicationFactor, null, creationConfigs)); + topic.name(), numPartitions, replicationFactor, null, creationConfigs, context.principal())); if (error.isFailure()) return error; } int numPartitions = newParts.size(); diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index b1e5f4c8872ba..9ebbaa8a2b8ae 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.metadata.KafkaConfigSchema; import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.metadata.SupportedConfigChecker; @@ -176,7 +177,8 @@ public void testIncrementalAlterConfigs() { entry("quux", entry(SET, "abc")))), entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123"))))), true, - false); + false, + KafkaPrincipal.ANONYMOUS); assertEquals(ControllerResult.atomicOf(List.of(new ApiMessageAndVersion( new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic"). @@ -193,7 +195,7 @@ public void testIncrementalAlterConfigs() { toMap(entry(MYTOPIC, ApiError.NONE))), manager.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap( entry("abc", entry(DELETE, "xyz"))))), - true, false)); + true, false, KafkaPrincipal.ANONYMOUS)); } @Test @@ -205,7 +207,7 @@ public void testIncrementalAlterConfig() { Map> keyToOps = toMap(entry("abc", entry(APPEND, "123"))); ControllerResult result = manager. - incrementalAlterConfig(MYTOPIC, keyToOps, true, false); + incrementalAlterConfig(MYTOPIC, keyToOps, true, false, KafkaPrincipal.ANONYMOUS); assertEquals(ControllerResult.atomicOf(List.of(new ApiMessageAndVersion( new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic"). @@ -218,13 +220,13 @@ public void testIncrementalAlterConfig() { new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic"). setName("abc").setValue(null), CONFIG_RECORD.highestSupportedVersion())), ApiError.NONE), - manager.incrementalAlterConfig(MYTOPIC, toMap(entry("abc", entry(DELETE, "xyz"))), true, false)); + manager.incrementalAlterConfig(MYTOPIC, toMap(entry("abc", entry(DELETE, "xyz"))), true, false, KafkaPrincipal.ANONYMOUS)); // The configuration value exceeding the maximum size is not allowed to be added. String largeValue = new String(new char[Short.MAX_VALUE - APPEND.id() - 1]); Map> largeValueOfOps = toMap(entry("abc", entry(APPEND, largeValue))); - ControllerResult invalidConfigValueResult = manager.incrementalAlterConfig(MYTOPIC, largeValueOfOps, true, false); + ControllerResult invalidConfigValueResult = manager.incrementalAlterConfig(MYTOPIC, largeValueOfOps, true, false, KafkaPrincipal.ANONYMOUS); assertEquals(Errors.INVALID_CONFIG, invalidConfigValueResult.response().error()); assertEquals("The configuration value cannot be added because it exceeds the maximum value size of " + Short.MAX_VALUE + " bytes.", invalidConfigValueResult.response().message()); @@ -238,7 +240,7 @@ public void testIncrementalAlterMultipleConfigValues() { build(); ControllerResult> result = manager. - incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123,456,789"))))), true, false); + incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123,456,789"))))), true, false, KafkaPrincipal.ANONYMOUS); assertEquals(ControllerResult.atomicOf(List.of(new ApiMessageAndVersion( new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic"). @@ -249,7 +251,7 @@ public void testIncrementalAlterMultipleConfigValues() { // It's ok for the appended value to be already present result = manager - .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123,456"))))), true, false); + .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123,456"))))), true, false, KafkaPrincipal.ANONYMOUS); assertEquals( ControllerResult.atomicOf(List.of(), toMap(entry(MYTOPIC, ApiError.NONE))), result @@ -257,7 +259,7 @@ public void testIncrementalAlterMultipleConfigValues() { RecordTestUtils.replayAll(manager, result.records()); result = manager - .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(SUBTRACT, "123,456"))))), true, false); + .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(SUBTRACT, "123,456"))))), true, false, KafkaPrincipal.ANONYMOUS); assertEquals(ControllerResult.atomicOf(List.of(new ApiMessageAndVersion( new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic"). setName("abc").setValue("789"), CONFIG_RECORD.highestSupportedVersion())), @@ -267,7 +269,7 @@ public void testIncrementalAlterMultipleConfigValues() { // It's ok for the deleted value not to be present result = manager - .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(SUBTRACT, "123456"))))), true, false); + .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(SUBTRACT, "123456"))))), true, false, KafkaPrincipal.ANONYMOUS); assertEquals( ControllerResult.atomicOf(List.of(), toMap(entry(MYTOPIC, ApiError.NONE))), result @@ -291,7 +293,8 @@ public void testIncrementalAlterConfigsWithoutExistence() { entry("quux", entry(SET, "1")))), entry(existingTopic, toMap(entry("def", entry(SET, "newVal"))))), false, - false); + false, + KafkaPrincipal.ANONYMOUS); assertEquals(ControllerResult.atomicOf(List.of(new ApiMessageAndVersion( new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("ExistingTopic"). @@ -364,9 +367,9 @@ public void testIncrementalAlterConfigsWithPolicy() { ), toMap(entry(MYTOPIC, new ApiError(Errors.POLICY_VIOLATION, "Expected: AlterConfigPolicy.RequestMetadata(resource=ConfigResource(" + - "type=TOPIC, name='mytopic'), configs={}). Got: " + + "type=TOPIC, name='mytopic'), configs={}, principal=null). Got: " + "AlterConfigPolicy.RequestMetadata(resource=ConfigResource(" + - "type=TOPIC, name='mytopic'), configs={foo.bar=123})")), + "type=TOPIC, name='mytopic'), configs={foo.bar=123}, principal=User:ANONYMOUS)")), entry(BROKER0, ApiError.NONE))), manager.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap( entry("foo.bar", entry(SET, "123")))), @@ -376,7 +379,63 @@ public void testIncrementalAlterConfigsWithPolicy() { entry("broker.config.to.remove", entry(DELETE, null)) ))), true, - false)); + false, + KafkaPrincipal.ANONYMOUS)); + } + + private static class CapturingAlterConfigsPolicy implements AlterConfigPolicy { + private final List captured = new ArrayList<>(); + + @Override + public void validate(RequestMetadata requestMetadata) throws PolicyViolationException { + captured.add(requestMetadata); + } + + @Override + public void close() { + // nothing to do + } + + @Override + public void configure(Map configs) { + // nothing to do + } + } + + @Test + public void testIncrementalAlterConfigsPassesPrincipalToPolicy() { + CapturingAlterConfigsPolicy policy = new CapturingAlterConfigsPolicy(); + ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). + setFeatureControl(createFeatureControlManager()). + setKafkaConfigSchema(SCHEMA). + setAlterConfigPolicy(Optional.of(policy)). + build(); + KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "tenant-a"); + manager.incrementalAlterConfigs( + toMap(entry(MYTOPIC, toMap(entry("foo.bar", entry(SET, "123"))))), + true, + false, + principal); + assertEquals(1, policy.captured.size()); + assertEquals(Optional.of(principal), policy.captured.get(0).principal()); + } + + @Test + public void testLegacyAlterConfigsPassesPrincipalToPolicy() { + CapturingAlterConfigsPolicy policy = new CapturingAlterConfigsPolicy(); + ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). + setFeatureControl(createFeatureControlManager()). + setKafkaConfigSchema(SCHEMA). + setAlterConfigPolicy(Optional.of(policy)). + build(); + KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "tenant-b"); + manager.legacyAlterConfigs( + toMap(entry(MYTOPIC, toMap(entry("foo.bar", "123")))), + true, + false, + principal); + assertEquals(1, policy.captured.size()); + assertEquals(Optional.of(principal), policy.captured.get(0).principal()); } private static class CheckForNullValuesPolicy implements AlterConfigPolicy { @@ -418,7 +477,7 @@ public void testLegacyAlterConfigs() { expectedRecords1, toMap(entry(MYTOPIC, ApiError.NONE))), manager.legacyAlterConfigs( toMap(entry(MYTOPIC, toMap(entry("abc", "456"), entry("def", "901")))), - true, false)); + true, false, KafkaPrincipal.ANONYMOUS)); for (ApiMessageAndVersion message : expectedRecords1) { manager.replay((ConfigRecord) message.message()); } @@ -432,7 +491,7 @@ expectedRecords1, toMap(entry(MYTOPIC, ApiError.NONE))), CONFIG_RECORD.highestSupportedVersion())), toMap(entry(MYTOPIC, ApiError.NONE))), manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def", "901")))), - true, false)); + true, false, KafkaPrincipal.ANONYMOUS)); } @ParameterizedTest @@ -448,7 +507,7 @@ public void testMaybeGenerateElrSafetyRecords(boolean setStaticConfig) { Map> keyToOps = toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "3"))); ConfigResource brokerConfigResource = new ConfigResource(ConfigResource.Type.BROKER, "1"); - ControllerResult result = manager.incrementalAlterConfig(brokerConfigResource, keyToOps, true, false); + ControllerResult result = manager.incrementalAlterConfig(brokerConfigResource, keyToOps, true, false, KafkaPrincipal.ANONYMOUS); assertEquals(Set.of(), manager.brokersWithConfigs()); assertEquals(ControllerResult.atomicOf(List.of(new ApiMessageAndVersion( @@ -513,7 +572,7 @@ public void testRejectMinIsrChangeWhenElrEnabled(boolean removal) { result = manager.incrementalAlterConfig(new ConfigResource(ConfigResource.Type.BROKER, "1"), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, removal ? entry(DELETE, null) : entry(SET, "3"))), - true, false); + true, false, KafkaPrincipal.ANONYMOUS); assertEquals(Errors.INVALID_CONFIG, result.response().error()); assertEquals("Broker-level min.insync.replicas cannot be altered while ELR is enabled.", result.response().message()); @@ -522,7 +581,7 @@ public void testRejectMinIsrChangeWhenElrEnabled(boolean removal) { result = manager.incrementalAlterConfig(new ConfigResource(ConfigResource.Type.BROKER, ""), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, removal ? entry(DELETE, null) : entry(SET, "3"))), - true, false); + true, false, KafkaPrincipal.ANONYMOUS); if (removal) { assertEquals(Errors.INVALID_CONFIG, result.response().error()); assertEquals("Cluster-level min.insync.replicas cannot be removed while ELR is enabled.", @@ -585,12 +644,12 @@ public void testCordonedLogDirsFeature(boolean enabled) { ControllerResult result = manager.incrementalAlterConfig(new ConfigResource(ConfigResource.Type.BROKER, "1"), toMap(entry(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, entry(SET, ""))), - true, false); + true, false, KafkaPrincipal.ANONYMOUS); assertEquals(enabled ? ApiError.NONE : DISABLED_CORDONED_LOG_DIRS_ERROR, result.response()); result = manager.incrementalAlterConfig(new ConfigResource(ConfigResource.Type.BROKER, "1"), toMap(entry(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, entry(SET, "*"))), - true, false); + true, false, KafkaPrincipal.ANONYMOUS); assertEquals(enabled ? INVALID_CORDONED_LOG_DIRS_ERROR : DISABLED_CORDONED_LOG_DIRS_ERROR, result.response()); } @@ -632,7 +691,8 @@ public void testValidateAlterConfigWithInvalidExistingConfigs() { MYTOPIC, toMap(entry("def", entry(SET, "newValue"))), false, - false); + false, + KafkaPrincipal.ANONYMOUS); assertEquals(ApiError.NONE, result.response()); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index b1ca959b4bf72..b5011b8d00604 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -708,7 +708,7 @@ public void testMinIsrUpdateWithElr() throws Throwable { // First, decrease the min ISR config to 1. This should clear the ELR fields. ControllerResult> result = active.configurationControl().incrementalAlterConfigs(toMap( entry(new ConfigResource(TOPIC, "foo"), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))), - true, false); + true, false, KafkaPrincipal.ANONYMOUS); assertEquals(2, result.records().size(), result.records().toString()); RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0))); RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1))); @@ -724,7 +724,7 @@ public void testMinIsrUpdateWithElr() throws Throwable { result = active.configurationControl().incrementalAlterConfigs(toMap( entry(new ConfigResource(BROKER, ""), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))), - true, false); + true, false, KafkaPrincipal.ANONYMOUS); assertEquals(2, result.records().size(), result.records().toString()); RecordTestUtils.replayAll(active.configurationControl(), List.of(result.records().get(0))); RecordTestUtils.replayAll(active.replicationControl(), List.of(result.records().get(1))); diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index cb7a74b334c2e..87d5c3ae4b498 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -62,6 +62,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment; +import org.apache.kafka.common.message.RequestHeaderData; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; import org.apache.kafka.common.metadata.ClearElrRecord; import org.apache.kafka.common.metadata.ConfigRecord; @@ -76,6 +77,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AlterPartitionRequest; import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; @@ -120,10 +122,12 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalInt; +import java.util.OptionalLong; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -922,6 +926,40 @@ public void testCreateTopicsWithPolicy() { ctx.createTestTopic("quux", new int[][] {new int[] {1, 2, 0}}, POLICY_VIOLATION.code()); } + @Test + public void testCreateTopicsPassesPrincipalToPolicy() { + AtomicReference> captured = new AtomicReference<>(); + CreateTopicPolicy policy = new CreateTopicPolicy() { + @Override + public void validate(RequestMetadata requestMetadata) throws PolicyViolationException { + captured.set(requestMetadata.principal()); + } + + @Override + public void close() { /* Nothing to do */ } + + @Override + public void configure(Map configs) { /* Nothing to do */ } + }; + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder(). + setCreateTopicPolicy(policy). + build(); + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "tenant-a"); + ControllerRequestContext requestContext = new ControllerRequestContext( + new RequestHeaderData().setRequestApiKey(ApiKeys.CREATE_TOPICS.id), + principal, + OptionalLong.empty()); + CreateTopicsRequestData request = new CreateTopicsRequestData(); + request.topics().add(new CreatableTopic().setName("foo"). + setNumPartitions(1).setReplicationFactor((short) 1)); + + ctx.replicationControl.createTopics(requestContext, request, Set.of("foo"), false); + assertEquals(Optional.of(principal), captured.get()); + } + @Test public void testCreateTopicsWithPolicyUnexpectedException() { CreateTopicPolicy policy = new CreateTopicPolicy() { @@ -2890,19 +2928,19 @@ public void testMaybeTriggerUncleanLeaderElectionForLeaderlessPartitions(String Map.of(new ConfigResource(ConfigResource.Type.BROKER, ""), Map.of(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))), - true, false).records()); + true, false, KafkaPrincipal.ANONYMOUS).records()); } else if (uncleanConfig.equals("dynamic_node")) { ctx.replay(ctx.configurationControl.incrementalAlterConfigs( Map.of(new ConfigResource(ConfigResource.Type.BROKER, "0"), Map.of(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))), - true, false).records()); + true, false, KafkaPrincipal.ANONYMOUS).records()); } else if (uncleanConfig.equals("dynamic_topic")) { ctx.replay(ctx.configurationControl.incrementalAlterConfigs( Map.of(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), Map.of(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))), - true, false).records()); + true, false, KafkaPrincipal.ANONYMOUS).records()); } ControllerResult balanceResult = replication.maybeElectUncleanLeaders(); assertFalse(balanceResult.response()); @@ -3495,13 +3533,13 @@ void testElrsRemovedOnMinIsrUpdate(boolean clusterLevel, boolean useLegacyAlterC ctx.replay(ctx.configurationControl.legacyAlterConfigs( Map.of(configResource, Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")), - false, false).records()); + false, false, KafkaPrincipal.ANONYMOUS).records()); } else { ctx.replay(ctx.configurationControl.incrementalAlterConfigs( Map.of(configResource, Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "1"))), - false, false).records()); + false, false, KafkaPrincipal.ANONYMOUS).records()); } assertArrayEquals(new int[]{}, ctx.replicationControl.getPartition(fooId, 0).elr); if (clusterLevel) {