-
Notifications
You must be signed in to change notification settings - Fork 250
kafka: extend client.id parsing per producer/consumer #426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,3 +8,35 @@ NOTE: This is a slightly modified version with two key differences: | |
- there is a new argument that converts all output result json files into a single csv. | ||
|
||
TODO: Document these changes. | ||
|
||
## Features | ||
|
||
### Zone-aware workers | ||
|
||
To pass a zone/rack ID (e.g. cloud region availability zone name) to the Kafka clients (producer, consumer) client-id configuration, use the system property `zone.id`, and use the template `{zone.id}` on the `client.id` config, either on the `commonConfig`, `producerConfig`, or `consumerConfig` Driver values. | ||
|
||
When running workers, pass the `zone.id`: | ||
|
||
```bash | ||
export JVM_OPTS=-Dzone.id=az0 | ||
/opt/benchmark/bin/benchmark-worker | ||
``` | ||
|
||
Then pass the `client.id` template: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not use something like |
||
```yaml | ||
commonConfig: | | ||
bootstrap.servers=localhost:9092 | ||
client.id=omb-client_az={zone.id} | ||
``` | ||
|
||
This generates producer and consumer `client.id=omb-client_az=value` | ||
|
||
```yaml | ||
producerConfig: | | ||
client.id=omb-producer_az={zone.id} | ||
consumerConfig: | | ||
auto.offset.reset=earliest | ||
client.id=omb-consumer_az={zone.id} | ||
``` | ||
|
||
This generates producer `client.id=omb-producer_az=value` and consumer `client.id=omb-consumer_az=value` |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,9 +53,10 @@ public class KafkaBenchmarkDriver implements BenchmarkDriver { | |
private List<BenchmarkProducer> producers = Collections.synchronizedList(new ArrayList<>()); | ||
private List<BenchmarkConsumer> consumers = Collections.synchronizedList(new ArrayList<>()); | ||
|
||
private Properties topicProperties; | ||
private Properties producerProperties; | ||
private Properties consumerProperties; | ||
// Visible for testing | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd use a Hashmap/table + enum for the three types |
||
Properties topicProperties; | ||
Properties producerProperties; | ||
Properties consumerProperties; | ||
|
||
private AdminClient admin; | ||
|
||
|
@@ -76,6 +77,14 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I | |
producerProperties = new Properties(); | ||
commonProperties.forEach((key, value) -> producerProperties.put(key, value)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, this seems inefficient |
||
producerProperties.load(new StringReader(config.producerConfig)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. StringReader could be replaced with a BAIS |
||
|
||
if (producerProperties.containsKey(KAFKA_CLIENT_ID)) { | ||
producerProperties.put( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't there a putIfAbsent inversion? |
||
KAFKA_CLIENT_ID, | ||
applyZoneId( | ||
producerProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG))); | ||
} | ||
|
||
producerProperties.put( | ||
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | ||
producerProperties.put( | ||
|
@@ -84,6 +93,14 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I | |
consumerProperties = new Properties(); | ||
commonProperties.forEach((key, value) -> consumerProperties.put(key, value)); | ||
consumerProperties.load(new StringReader(config.consumerConfig)); | ||
|
||
if (consumerProperties.containsKey(KAFKA_CLIENT_ID)) { | ||
consumerProperties.put( | ||
KAFKA_CLIENT_ID, | ||
applyZoneId( | ||
consumerProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG))); | ||
} | ||
|
||
consumerProperties.put( | ||
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); | ||
consumerProperties.put( | ||
|
@@ -165,7 +182,8 @@ private static String applyZoneId(String clientId, String zoneId) { | |
return clientId.replace(ZONE_ID_TEMPLATE, zoneId); | ||
} | ||
|
||
private static final ObjectMapper mapper = | ||
// Visible for testing | ||
static final ObjectMapper mapper = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not having this as a static final member is inefficient |
||
new ObjectMapper(new YAMLFactory()) | ||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
/* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.openmessaging.benchmark.driver.kafka; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import org.junit.jupiter.api.io.TempDir; | ||
import org.junit.jupiter.params.ParameterizedTest; | ||
import org.junit.jupiter.params.provider.CsvSource; | ||
|
||
class KafkaBenchmarkDriverTest { | ||
@TempDir Path tempDir; | ||
|
||
@ParameterizedTest | ||
@CsvSource({ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should ideally externalize this as a resource file |
||
"client.id=test_az={zone.id},\"\",\"\",test_az=az0,test_az=az0", | ||
"client.id=test_az={zone.id},client.id=prod_az={zone.id},client.id=cons_az={zone.id},prod_az=az0,cons_az=az0", | ||
"\"\",client.id=prod_az={zone.id},client.id=cons_az={zone.id},prod_az=az0,cons_az=az0", | ||
"\"\",client.id=prod_az={zone.id},\"\",prod_az=az0,", | ||
"\"\",\"\",client.id=cons_az={zone.id},,cons_az=az0" | ||
}) | ||
void testInitClientIdWithZoneId( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not use a String array as your param? Overloaded parameters are a code smell |
||
String commonConfig, | ||
String producerConfig, | ||
String consumerConfig, | ||
String producerClientId, | ||
String consumerClientId) | ||
throws Exception { | ||
// Given these configs | ||
final Path configPath = tempDir.resolve("config"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: move config magic string to an enum or constant |
||
Config config = new Config(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You might want to try dependency spy injection here |
||
config.replicationFactor = 1; | ||
config.commonConfig = "bootstrap.servers=localhost:9092\n" + commonConfig; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should really use test containers here with official images as a controllable Junit flag |
||
config.producerConfig = producerConfig; | ||
config.consumerConfig = consumerConfig; | ||
config.topicConfig = ""; | ||
|
||
// and the system property set for zone id | ||
System.setProperty("zone.id", "az0"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This modifies the global JVM system. Shouldn't do that ever in a unit or integration test |
||
|
||
try (KafkaBenchmarkDriver driver = new KafkaBenchmarkDriver()) { | ||
// When initializing kafka driver | ||
Files.write(configPath, KafkaBenchmarkDriver.mapper.writeValueAsBytes(config)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a ByteArray Stream rather than a file |
||
driver.initialize(configPath.toFile(), null); | ||
|
||
// Then | ||
if (producerClientId != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Optional types are better than nullable Objects |
||
assertThat(driver.producerProperties).containsEntry("client.id", producerClientId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Assert against CommonConfig constants not magic string |
||
} else { | ||
assertThat(driver.producerProperties).doesNotContainKey("client.id"); | ||
} | ||
if (consumerClientId != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should assert nullability based on parameterized test order , otherwise this is just lazy testing |
||
assertThat(driver.consumerProperties).containsEntry("client.id", consumerClientId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume containsEntry does the key check short circuit? |
||
} else { | ||
assertThat(driver.consumerProperties).doesNotContainKey("client.id"); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
KAFKA_OPTS