Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.security.auth.KafkaPrincipal;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* <p>An interface for enforcing a policy on alter configs requests.
Expand All @@ -42,15 +44,31 @@ class RequestMetadata {

private final ConfigResource resource;
private final Map<String, String> configs;
private final KafkaPrincipal principal;

/**
* Create an instance of this class with the provided parameters.
*
* This constructor is public to make testing of <code>AlterConfigPolicy</code> implementations easier.
*/
public RequestMetadata(ConfigResource resource, Map<String, String> configs) {
this(resource, configs, null);
}

/**
* Create an instance of this class with the provided parameters, including the principal that initiated the
* request.
*
* This constructor is public to make testing of <code>AlterConfigPolicy</code> implementations easier.
*
* @param resource the resource whose configs are being altered.
* @param configs the configs in the request.
* @param principal the authenticated principal that initiated the request, or null if not available.
*/
public RequestMetadata(ConfigResource resource, Map<String, String> configs, KafkaPrincipal principal) {
this.resource = resource;
this.configs = configs;
this.principal = principal;
}

/**
Expand All @@ -64,6 +82,16 @@ public ConfigResource resource() {
return resource;
}

/**
* Return the authenticated principal that initiated the request, if available. This may be empty when the
* metadata is constructed without a principal (for example, in tests or via the legacy constructor).
*/
public Optional<KafkaPrincipal> principal() {
return Optional.ofNullable(principal);
}

// The principal is intentionally excluded from equals/hashCode: it is request-scoped metadata about who
// initiated the request, not part of the identity of the requested config change.
@Override
public int hashCode() {
return Objects.hash(resource, configs);
Expand All @@ -80,7 +108,8 @@ public boolean equals(Object o) {
@Override
public String toString() {
return "AlterConfigPolicy.RequestMetadata(resource=" + resource +
", configs=" + configs + ")";
", configs=" + configs +
", principal=" + principal + ")";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.security.auth.KafkaPrincipal;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* <p>An interface for enforcing a policy on create topics requests.
Expand All @@ -45,6 +47,7 @@ class RequestMetadata {
private final Short replicationFactor;
private final Map<Integer, List<Integer>> replicasAssignments;
private final Map<String, String> configs;
private final KafkaPrincipal principal;

/**
* Create an instance of this class with the provided parameters.
Expand All @@ -61,11 +64,33 @@ class RequestMetadata {
*/
public RequestMetadata(String topic, Integer numPartitions, Short replicationFactor,
Map<Integer, List<Integer>> replicasAssignments, Map<String, String> configs) {
this(topic, numPartitions, replicationFactor, replicasAssignments, configs, null);
}

/**
* Create an instance of this class with the provided parameters, including the principal that initiated the
* request.
*
* This constructor is public to make testing of <code>CreateTopicPolicy</code> implementations easier.
*
* @param topic the name of the topic to create.
* @param numPartitions the number of partitions to create or null if replicasAssignments is set.
* @param replicationFactor the replication factor for the topic or null if replicaAssignments is set.
* @param replicasAssignments replica assignments or null if numPartitions and replicationFactor is set. The
* assignment is a map from partition id to replica (broker) ids.
* @param configs topic configs for the topic to be created, not including broker defaults. Broker configs are
* passed via the {@code configure()} method of the policy implementation.
* @param principal the authenticated principal that initiated the request, or null if not available.
*/
public RequestMetadata(String topic, Integer numPartitions, Short replicationFactor,
Map<Integer, List<Integer>> replicasAssignments, Map<String, String> configs,
KafkaPrincipal principal) {
this.topic = topic;
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
this.replicasAssignments = replicasAssignments == null ? null : Collections.unmodifiableMap(replicasAssignments);
this.configs = Collections.unmodifiableMap(configs);
this.principal = principal;
}

/**
Expand Down Expand Up @@ -105,6 +130,16 @@ public Map<String, String> configs() {
return configs;
}

/**
* Return the authenticated principal that initiated the request, if available. This may be empty when the
* metadata is constructed without a principal (for example, in tests or via the legacy constructor).
*/
public Optional<KafkaPrincipal> principal() {
return Optional.ofNullable(principal);
}

// The principal is intentionally excluded from equals/hashCode: it is request-scoped metadata about who
// initiated the request, not part of the identity of the requested topic.
@Override
public int hashCode() {
return Objects.hash(topic, numPartitions, replicationFactor,
Expand All @@ -129,7 +164,8 @@ public String toString() {
", numPartitions=" + numPartitions +
", replicationFactor=" + replicationFactor +
", replicasAssignments=" + replicasAssignments +
", configs=" + configs + ")";
", configs=" + configs +
", principal=" + principal + ")";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@

import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;

import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class AlterConfigPolicyTest {

Expand All @@ -49,4 +52,51 @@ public void testRequestMetadataEquals() {
Collections.emptyMap()
));
}

@Test
public void testPrincipalDefaultsToEmpty() {
RequestMetadata requestMetadata = new RequestMetadata(
new ConfigResource(Type.BROKER, "0"),
Collections.singletonMap("foo", "bar")
);
assertEquals(Optional.empty(), requestMetadata.principal());
}

@Test
public void testPrincipalIsExposed() {
KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice");
RequestMetadata requestMetadata = new RequestMetadata(
new ConfigResource(Type.BROKER, "0"),
Collections.singletonMap("foo", "bar"),
principal
);
assertEquals(Optional.of(principal), requestMetadata.principal());
}

@Test
public void testPrincipalExcludedFromEqualsAndHashCode() {
RequestMetadata withoutPrincipal = new RequestMetadata(
new ConfigResource(Type.BROKER, "0"),
Collections.singletonMap("foo", "bar")
);
RequestMetadata withPrincipal = new RequestMetadata(
new ConfigResource(Type.BROKER, "0"),
Collections.singletonMap("foo", "bar"),
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice")
);
// The principal is request-scoped metadata and must not affect equality of the requested change.
assertEquals(withoutPrincipal, withPrincipal);
assertEquals(withoutPrincipal.hashCode(), withPrincipal.hashCode());
}

@Test
public void testToStringIncludesPrincipal() {
RequestMetadata requestMetadata = new RequestMetadata(
new ConfigResource(Type.BROKER, "0"),
Collections.singletonMap("foo", "bar"),
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice")
);
assertTrue(requestMetadata.toString().contains("principal=User:alice"),
"Expected toString to contain the principal, but was: " + requestMetadata);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.policy;

import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata;

import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class CreateTopicPolicyTest {

@Test
public void testPrincipalDefaultsToEmpty() {
RequestMetadata requestMetadata = new RequestMetadata(
"topic", 1, (short) 1, null, Map.of());
assertEquals(Optional.empty(), requestMetadata.principal());
}

@Test
public void testPrincipalIsExposed() {
KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice");
RequestMetadata requestMetadata = new RequestMetadata(
"topic", 1, (short) 1, null, Map.of(), principal);
assertEquals(Optional.of(principal), requestMetadata.principal());
}

@Test
public void testPrincipalExcludedFromEqualsAndHashCode() {
RequestMetadata withoutPrincipal = new RequestMetadata(
"topic", 1, (short) 1, null, Map.of());
RequestMetadata withPrincipal = new RequestMetadata(
"topic", 1, (short) 1, null, Map.of(),
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice"));
// The principal is request-scoped metadata and must not affect equality of the requested topic.
assertEquals(withoutPrincipal, withPrincipal);
assertEquals(withoutPrincipal.hashCode(), withPrincipal.hashCode());
}

@Test
public void testToStringIncludesPrincipal() {
RequestMetadata requestMetadata = new RequestMetadata(
"topic", 1, (short) 1, null, Map.of(),
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice"));
assertTrue(requestMetadata.toString().contains("principal=User:alice"),
"Expected toString to contain the principal, but was: " + requestMetadata);
}
}
Loading
Loading