Skip to content

kafka: extend client.id parsing per producer/consumer #426

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jeqo
Copy link
Contributor

@jeqo jeqo commented Mar 30, 2025

For further flexibility, it may be useful to differentiate client ids when passing a zone id.

Currently, if a zone id variable is needed, it needs to be in common configurations, forcing both producer and consumer to have the same values (with different zones).

This proposal extends the same parsing to client ids from producer and consumers; adds some unit tests; and documents this feature.

@jeqo jeqo force-pushed the kafka-client-id-per-prod-cons branch from 0a00002 to 874db8a Compare March 30, 2025 21:18
Copy link

@OneCricketeer OneCricketeer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Observations


When running workers, pass the `zone.id`:

```bash

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit KAFKA_OPTS

/opt/benchmark/bin/benchmark-worker
```

Then pass the `client.id` template:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use something like envsubst

private Properties topicProperties;
private Properties producerProperties;
private Properties consumerProperties;
// Visible for testing
Copy link

@OneCricketeer OneCricketeer Apr 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use a Hashmap/table + enum for the three types

@@ -76,6 +77,14 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I
producerProperties = new Properties();
commonProperties.forEach((key, value) -> producerProperties.put(key, value));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this seems inefficient

@@ -76,6 +77,14 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I
producerProperties = new Properties();
commonProperties.forEach((key, value) -> producerProperties.put(key, value));
producerProperties.load(new StringReader(config.producerConfig));

if (producerProperties.containsKey(KAFKA_CLIENT_ID)) {
producerProperties.put(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a putIfAbsent inversion?


try (KafkaBenchmarkDriver driver = new KafkaBenchmarkDriver()) {
// When initializing kafka driver
Files.write(configPath, KafkaBenchmarkDriver.mapper.writeValueAsBytes(config));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a ByteArray Stream rather than a file


// Then
if (producerClientId != null) {
assertThat(driver.producerProperties).containsEntry("client.id", producerClientId);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert against CommonConfig constants not magic string

driver.initialize(configPath.toFile(), null);

// Then
if (producerClientId != null) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Optional types are better than nullable Objects

} else {
assertThat(driver.producerProperties).doesNotContainKey("client.id");
}
if (consumerClientId != null) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should assert nullability based on parameterized test order , otherwise this is just lazy testing

assertThat(driver.producerProperties).doesNotContainKey("client.id");
}
if (consumerClientId != null) {
assertThat(driver.consumerProperties).containsEntry("client.id", consumerClientId);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume containsEntry does the key check short circuit?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants