-
Notifications
You must be signed in to change notification settings - Fork 31
Optional kafka segment configs #29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Conflicts: bulkerapp/app/topic_manager.go
|
Your cubic subscription is currently inactive. Please reactivate your subscription to receive AI reviews and use cubic. |
kafkabase/kafka_config.go
Outdated
| config["segment.ms"] = fmt.Sprint(c.KafkaTopicSegmentHours * 60 * 60 * 1000) | ||
| } | ||
| default: | ||
| config["compression.type"] = c.KafkaTopicCompression |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should set "compression.type" for all modes with map init
kafkabase/kafka_config.go
Outdated
|
|
||
| switch mode { | ||
| case "retry": | ||
| config["retention.ms"] = fmt.Sprint(c.KafkaTopicRetentionHours * 60 * 60 * 1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaRetryTopicRetentionHours
bulkerapp/app/batch_consumer.go
Outdated
|
|
||
| }() | ||
| err = bc.topicManager.ensureTopic(bc.retryTopic, 1, bc.topicManager.RetryTopicConfig()) | ||
| err = bc.topicManager.ensureTopic(bc.retryTopic, 1, bc.topicManager.config.TopicConfig("retry")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bc has config property too
bulkerapp/app/stream_consumer.go
Outdated
| failedTopic = sc.config.KafkaDestinationsDeadLetterTopicName | ||
| } else { | ||
| err = sc.topicManager.ensureTopic(sc.retryTopic, 1, sc.topicManager.RetryTopicConfig()) | ||
| err = sc.topicManager.ensureTopic(sc.retryTopic, 1, sc.topicManager.config.TopicConfig("retry")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sc has config property too
|
thanks for reviewing @absorbb !! just addressed your feedback |
This is an attempt to make kafka segment configs optional so bulker can work with providers that do not allow clients to set those configs, AWS MSK for example.
It introduces a new config option
KafkaAllowSegmentConfigwhich is enabled by default, same behaviour as currentmainbranch, but allow users to disable it.