From 874db8a99f7bb69a79c2693f4db10820020de194 Mon Sep 17 00:00:00 2001 From: Jorge Quilcate Date: Sun, 30 Mar 2025 14:08:08 +0300 Subject: [PATCH] kafka: add client.id parsing per producer/consumer --- driver-kafka/README.md | 32 +++++++++ .../driver/kafka/KafkaBenchmarkDriver.java | 26 +++++-- .../kafka/KafkaBenchmarkDriverTest.java | 72 +++++++++++++++++++ 3 files changed, 126 insertions(+), 4 deletions(-) create mode 100644 driver-kafka/src/test/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriverTest.java diff --git a/driver-kafka/README.md b/driver-kafka/README.md index eef66e161..96b2c4b65 100644 --- a/driver-kafka/README.md +++ b/driver-kafka/README.md @@ -8,3 +8,35 @@ NOTE: This is a slightly modified version with two key differences: - there is a new argument that converts all output result json files into a single csv. TODO: Document these changes. + +## Features + +### Zone-aware workers + +To pass a zone/rack ID (e.g. cloud region availability zone name) to the Kafka clients (producer, consumer) client-id configuration, use the system property `zone.id`, and use the template `{zone.id}` on the `client.id` config, either on the `commonConfig`, `producerConfig`, or `consumerConfig` Driver values. + +When running workers, pass the `zone.id`: + +```bash +export JVM_OPTS=-Dzone.id=az0 +/opt/benchmark/bin/benchmark-worker +``` + +Then pass the `client.id` template: +```yaml +commonConfig: | + bootstrap.servers=localhost:9092 + client.id=omb-client_az={zone.id} +``` + +This generates producer and consumer `client.id=omb-client_az=value` + +```yaml +producerConfig: | + client.id=omb-producer_az={zone.id} +consumerConfig: | + auto.offset.reset=earliest + client.id=omb-consumer_az={zone.id} +``` + +This generates producer `client.id=omb-producer_az=value` and consumer `client.id=omb-consumer_az=value` diff --git a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java index d506b5b37..a2244606d 100644 --- a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java +++ b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java @@ -53,9 +53,10 @@ public class KafkaBenchmarkDriver implements BenchmarkDriver { private List producers = Collections.synchronizedList(new ArrayList<>()); private List consumers = Collections.synchronizedList(new ArrayList<>()); - private Properties topicProperties; - private Properties producerProperties; - private Properties consumerProperties; + // Visible for testing + Properties topicProperties; + Properties producerProperties; + Properties consumerProperties; private AdminClient admin; @@ -76,6 +77,14 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I producerProperties = new Properties(); commonProperties.forEach((key, value) -> producerProperties.put(key, value)); producerProperties.load(new StringReader(config.producerConfig)); + + if (producerProperties.containsKey(KAFKA_CLIENT_ID)) { + producerProperties.put( + KAFKA_CLIENT_ID, + applyZoneId( + producerProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG))); + } + producerProperties.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProperties.put( @@ -84,6 +93,14 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I consumerProperties = new Properties(); commonProperties.forEach((key, value) -> consumerProperties.put(key, value)); consumerProperties.load(new StringReader(config.consumerConfig)); + + if (consumerProperties.containsKey(KAFKA_CLIENT_ID)) { + consumerProperties.put( + KAFKA_CLIENT_ID, + applyZoneId( + consumerProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG))); + } + consumerProperties.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put( @@ -165,7 +182,8 @@ private static String applyZoneId(String clientId, String zoneId) { return clientId.replace(ZONE_ID_TEMPLATE, zoneId); } - private static final ObjectMapper mapper = + // Visible for testing + static final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()) .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } diff --git a/driver-kafka/src/test/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriverTest.java b/driver-kafka/src/test/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriverTest.java new file mode 100644 index 000000000..f6dd568e0 --- /dev/null +++ b/driver-kafka/src/test/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriverTest.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.benchmark.driver.kafka; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.file.Files; +import java.nio.file.Path; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +class KafkaBenchmarkDriverTest { + @TempDir Path tempDir; + + @ParameterizedTest + @CsvSource({ + "client.id=test_az={zone.id},\"\",\"\",test_az=az0,test_az=az0", + "client.id=test_az={zone.id},client.id=prod_az={zone.id},client.id=cons_az={zone.id},prod_az=az0,cons_az=az0", + "\"\",client.id=prod_az={zone.id},client.id=cons_az={zone.id},prod_az=az0,cons_az=az0", + "\"\",client.id=prod_az={zone.id},\"\",prod_az=az0,", + "\"\",\"\",client.id=cons_az={zone.id},,cons_az=az0" + }) + void testInitClientIdWithZoneId( + String commonConfig, + String producerConfig, + String consumerConfig, + String producerClientId, + String consumerClientId) + throws Exception { + // Given these configs + final Path configPath = tempDir.resolve("config"); + Config config = new Config(); + config.replicationFactor = 1; + config.commonConfig = "bootstrap.servers=localhost:9092\n" + commonConfig; + config.producerConfig = producerConfig; + config.consumerConfig = consumerConfig; + config.topicConfig = ""; + + // and the system property set for zone id + System.setProperty("zone.id", "az0"); + + try (KafkaBenchmarkDriver driver = new KafkaBenchmarkDriver()) { + // When initializing kafka driver + Files.write(configPath, KafkaBenchmarkDriver.mapper.writeValueAsBytes(config)); + driver.initialize(configPath.toFile(), null); + + // Then + if (producerClientId != null) { + assertThat(driver.producerProperties).containsEntry("client.id", producerClientId); + } else { + assertThat(driver.producerProperties).doesNotContainKey("client.id"); + } + if (consumerClientId != null) { + assertThat(driver.consumerProperties).containsEntry("client.id", consumerClientId); + } else { + assertThat(driver.consumerProperties).doesNotContainKey("client.id"); + } + } + } +}