Skip to content

Commit 874db8a

Browse files
committed
kafka: add client.id parsing per producer/consumer
1 parent b10b227 commit 874db8a

3 files changed

Lines changed: 126 additions & 4 deletions

File tree

driver-kafka/README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,35 @@ NOTE: This is a slightly modified version with two key differences:
88
- there is a new argument that converts all output result json files into a single csv.
99

1010
TODO: Document these changes.
11+
12+
## Features
13+
14+
### Zone-aware workers
15+
16+
To pass a zone/rack ID (e.g. cloud region availability zone name) to the Kafka clients (producer, consumer) client-id configuration, use the system property `zone.id`, and use the template `{zone.id}` on the `client.id` config, either on the `commonConfig`, `producerConfig`, or `consumerConfig` Driver values.
17+
18+
When running workers, pass the `zone.id`:
19+
20+
```bash
21+
export JVM_OPTS=-Dzone.id=az0
22+
/opt/benchmark/bin/benchmark-worker
23+
```
24+
25+
Then pass the `client.id` template:
26+
```yaml
27+
commonConfig: |
28+
bootstrap.servers=localhost:9092
29+
client.id=omb-client_az={zone.id}
30+
```
31+
32+
This generates producer and consumer `client.id=omb-client_az=value`
33+
34+
```yaml
35+
producerConfig: |
36+
client.id=omb-producer_az={zone.id}
37+
consumerConfig: |
38+
auto.offset.reset=earliest
39+
client.id=omb-consumer_az={zone.id}
40+
```
41+
42+
This generates producer `client.id=omb-producer_az=value` and consumer `client.id=omb-consumer_az=value`

driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,10 @@ public class KafkaBenchmarkDriver implements BenchmarkDriver {
5353
private List<BenchmarkProducer> producers = Collections.synchronizedList(new ArrayList<>());
5454
private List<BenchmarkConsumer> consumers = Collections.synchronizedList(new ArrayList<>());
5555

56-
private Properties topicProperties;
57-
private Properties producerProperties;
58-
private Properties consumerProperties;
56+
// Visible for testing
57+
Properties topicProperties;
58+
Properties producerProperties;
59+
Properties consumerProperties;
5960

6061
private AdminClient admin;
6162

@@ -76,6 +77,14 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I
7677
producerProperties = new Properties();
7778
commonProperties.forEach((key, value) -> producerProperties.put(key, value));
7879
producerProperties.load(new StringReader(config.producerConfig));
80+
81+
if (producerProperties.containsKey(KAFKA_CLIENT_ID)) {
82+
producerProperties.put(
83+
KAFKA_CLIENT_ID,
84+
applyZoneId(
85+
producerProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG)));
86+
}
87+
7988
producerProperties.put(
8089
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
8190
producerProperties.put(
@@ -84,6 +93,14 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I
8493
consumerProperties = new Properties();
8594
commonProperties.forEach((key, value) -> consumerProperties.put(key, value));
8695
consumerProperties.load(new StringReader(config.consumerConfig));
96+
97+
if (consumerProperties.containsKey(KAFKA_CLIENT_ID)) {
98+
consumerProperties.put(
99+
KAFKA_CLIENT_ID,
100+
applyZoneId(
101+
consumerProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG)));
102+
}
103+
87104
consumerProperties.put(
88105
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
89106
consumerProperties.put(
@@ -165,7 +182,8 @@ private static String applyZoneId(String clientId, String zoneId) {
165182
return clientId.replace(ZONE_ID_TEMPLATE, zoneId);
166183
}
167184

168-
private static final ObjectMapper mapper =
185+
// Visible for testing
186+
static final ObjectMapper mapper =
169187
new ObjectMapper(new YAMLFactory())
170188
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
171189
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.openmessaging.benchmark.driver.kafka;
15+
16+
import static org.assertj.core.api.Assertions.assertThat;
17+
18+
import java.nio.file.Files;
19+
import java.nio.file.Path;
20+
import org.junit.jupiter.api.io.TempDir;
21+
import org.junit.jupiter.params.ParameterizedTest;
22+
import org.junit.jupiter.params.provider.CsvSource;
23+
24+
class KafkaBenchmarkDriverTest {
25+
@TempDir Path tempDir;
26+
27+
@ParameterizedTest
28+
@CsvSource({
29+
"client.id=test_az={zone.id},\"\",\"\",test_az=az0,test_az=az0",
30+
"client.id=test_az={zone.id},client.id=prod_az={zone.id},client.id=cons_az={zone.id},prod_az=az0,cons_az=az0",
31+
"\"\",client.id=prod_az={zone.id},client.id=cons_az={zone.id},prod_az=az0,cons_az=az0",
32+
"\"\",client.id=prod_az={zone.id},\"\",prod_az=az0,",
33+
"\"\",\"\",client.id=cons_az={zone.id},,cons_az=az0"
34+
})
35+
void testInitClientIdWithZoneId(
36+
String commonConfig,
37+
String producerConfig,
38+
String consumerConfig,
39+
String producerClientId,
40+
String consumerClientId)
41+
throws Exception {
42+
// Given these configs
43+
final Path configPath = tempDir.resolve("config");
44+
Config config = new Config();
45+
config.replicationFactor = 1;
46+
config.commonConfig = "bootstrap.servers=localhost:9092\n" + commonConfig;
47+
config.producerConfig = producerConfig;
48+
config.consumerConfig = consumerConfig;
49+
config.topicConfig = "";
50+
51+
// and the system property set for zone id
52+
System.setProperty("zone.id", "az0");
53+
54+
try (KafkaBenchmarkDriver driver = new KafkaBenchmarkDriver()) {
55+
// When initializing kafka driver
56+
Files.write(configPath, KafkaBenchmarkDriver.mapper.writeValueAsBytes(config));
57+
driver.initialize(configPath.toFile(), null);
58+
59+
// Then
60+
if (producerClientId != null) {
61+
assertThat(driver.producerProperties).containsEntry("client.id", producerClientId);
62+
} else {
63+
assertThat(driver.producerProperties).doesNotContainKey("client.id");
64+
}
65+
if (consumerClientId != null) {
66+
assertThat(driver.consumerProperties).containsEntry("client.id", consumerClientId);
67+
} else {
68+
assertThat(driver.consumerProperties).doesNotContainKey("client.id");
69+
}
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)