Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consumer): add recovery from no leader partitions #3101

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

liutao365
Copy link

When some topic partitions have no leader due to Kafka broker failures, the Sarama consumer group should be able to continue consuming partitions that have leaders and resume consuming the partitions that previously had no leader once they return to normal. This pull request addresses this issue.

When some topic partitions have no leader due to Kafka broker failures,
the Sarama consumer group should be able to continue consuming
partitions that do have leaders and resume consuming the partitions that
previously had no leader once they return to normal.

Signed-off-by: liutao366 <[email protected]>
Copy link
Contributor

@puellanivis puellanivis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 nothing significant I can see that should be improved. While there is a suggestion here, I don’t think it’s necessarily any better than the existing approach.

@liutao365
Copy link
Author

Add test case:

  1. create a more than 6 partitions and 1 replication-factor (simplify the test case) topic and produce some millions messages .
  2. shut down one of the kafka brokers, which will cause some of the topic partitions lose leader.
  3. run the expamples/consumergroup/main.go and add a configuration
    image
  4. restore the broker previously shut down.
  5. describe the consumergroup and check all the partittions have no consumer lag

@dnwe dnwe changed the title fix: Fix the problem when some partitions have no leader fix(consumer): add recovery from no leader partitions Feb 19, 2025
@dnwe dnwe force-pushed the main branch 3 times, most recently from 990524c to 5dc4e24 Compare February 19, 2025 16:53
dnwe added 2 commits February 19, 2025 17:09
Just to avoid some duplication here

Signed-off-by: Dominic Evans <[email protected]>
@dnwe
Copy link
Collaborator

dnwe commented Feb 19, 2025

@liutao365 thanks for proposing this change, the approach looks good to me – I added a commit to fixup the client locking (the FV was failing for the race condition) and another commit to slightly refactor the consume partition code into a single named func to cover both paths. Can you take a look and confirm you're happy? Also @puellanivis if you wouldn't mind re-reviewing that would be great too.

We should probably add a unittest to cover this scenario, but I'm happy for us to do that under a follow-up PR as it is probably worth landing this fix sooner rather than later

@dnwe dnwe added the fix label Feb 19, 2025
@liutao365
Copy link
Author

@liutao365 thanks for proposing this change, the approach looks good to me – I added a commit to fixup the client locking (the FV was failing for the race condition) and another commit to slightly refactor the consume partition code into a single named func to cover both paths. Can you take a look and confirm you're happy? Also @puellanivis if you wouldn't mind re-reviewing that would be great too.

We should probably add a unittest to cover this scenario, but I'm happy for us to do that under a follow-up PR as it is probably worth landing this fix sooner rather than later

Hi, @dnwe , I've red the changes and its better than my original commit, I'm happy with this improvements.

Copy link
Member

@prestona prestona left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

// start consuming each topic partition in a goroutine
for topic, partitions := range claims {
for _, partition := range partitions {
go consumeTopicPartition(sess, topic, partition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, this function is now used exclusively here, and only here. So, assigning it to a local variable doesn’t seem all that necessary. Instead, we could just use:

go func(topic string, partition int32) {
  // function body here.
}(topic, partition)

(Also, there’s no reason to pass sess through, since it’s already available by closure. topic and partition are loopvars, so we still want to pass them through as parameters to get a local copy for go 1.21, which would otherwise misbehave if the variables were accessed through closure.)

Also, this would let us pull the sess.waitGroup.Add(1) out of the function body, but still keep it spatially close to the paired Done(). We usually want to prefer adding to the waitgroup outside of the go func() because we cannot be sure at what time the new goroutine will run, and thus performing the Add in the goroutine means there is a race condition within which a wg.Wait() could fail to block properly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulling the anonymous go routine func out into a named one was mostly aesthetic / personal preference, but in this scenario where it was nested within a double for loop, I felt it made it easier to at a glance understand what the current code was doing (spinning off a blocking goroutine for each topic-partition) and decoupled what that goroutine was doing from the loop

In general I agree about incrementing waitGroups outside of the go routine. However, in this case we're only ever using the waitGroup.Wait() in cleanup to give any active consumers time to complete. If we haven't started the goroutine then the consumer won't be active and if it starts post-cleanup the sess context will already be done so consume won't proceed.

Copy link
Contributor

@puellanivis puellanivis Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see esthetics of removing indentions. I personally like to avoid assigning things that are only used once to temporary values because one of the two hard problems in computers is naming things.

For me, this kind of breaks up the code flow (one of the ideas for why golang uses goroutines rather than promises to get async behavior). While, sure, it is nice to remove the indention levels of the anonymous go func, it still only executes in one place, and that’s the for loop. Like we would recommend spitting the body of a for loop out if we weren’t starting a new goroutine with it right? So, it just feels weird. It just kind of feels a bit too clever.

But that said, it’s not wrong, so… 🤷‍♀️

As for the WaitGroup handling, I can see an argument for why in this particular instance it isn’t necessarily racey, I like to practice something I call “code hygiene”. Like, even if I know my hands aren’t dirty in this specific case before I start handling food, I still make myself wash them anyways, because it helps build and reinforce good habits. (Though, of course, always beware of good habits becoming unnecessary as the language makes them unnecessary. c.f. the loopvar guarding.)

P.S. In case it wasn’t perfectly clear. I have no objections to the code as is.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants