Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consumer): add recovery from no leader partitions #3101

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ type Client interface {
// LeastLoadedBroker retrieves broker that has the least responses pending
LeastLoadedBroker() *Broker

// check if partition is readable
PartitionNotReadable(topic string, partition int32) bool

// Close shuts down all broker connections managed by this client. It is required
// to call this function before a client object passes out of scope, as it will
// otherwise leak memory. You must close any Producers or Consumers using a client
Expand Down Expand Up @@ -1283,3 +1286,14 @@ type nopCloserClient struct {
func (ncc *nopCloserClient) Close() error {
return nil
}

func (client *client) PartitionNotReadable(topic string, partition int32) bool {
client.lock.RLock()
defer client.lock.RUnlock()

pm := client.metadata[topic][partition]
if pm == nil {
return true
}
return pm.Leader == -1
}
42 changes: 29 additions & 13 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,21 +861,37 @@ func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims
return nil, err
}

// start consuming
for topic, partitions := range claims {
for _, partition := range partitions {
sess.waitGroup.Add(1)

go func(topic string, partition int32) {
defer sess.waitGroup.Done()
// consume a single topic/partition, blocking
consumeTopicPartition := func(sess *consumerGroupSession, topic string, partition int32) {
sess.waitGroup.Add(1)
defer sess.waitGroup.Done()

// cancel the group session as soon as any of the consume calls return
defer sess.cancel()

// if partition not currently readable, wait for it to become readable
if sess.parent.client.PartitionNotReadable(topic, partition) {
timer := time.NewTimer(5 * time.Second)
for sess.parent.client.PartitionNotReadable(topic, partition) {
select {
case <-ctx.Done():
return
case <-parent.closed:
return
case <-timer.C:
timer.Reset(5 * time.Second)
}
}
timer.Stop()
}

// cancel the as session as soon as the first
// goroutine exits
defer sess.cancel()
sess.consume(topic, partition)
}

// consume a single topic/partition, blocking
sess.consume(topic, partition)
}(topic, partition)
// start consuming each topic partition in a goroutine
for topic, partitions := range claims {
for _, partition := range partitions {
go consumeTopicPartition(sess, topic, partition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, this function is now used exclusively here, and only here. So, assigning it to a local variable doesn’t seem all that necessary. Instead, we could just use:

go func(topic string, partition int32) {
  // function body here.
}(topic, partition)

(Also, there’s no reason to pass sess through, since it’s already available by closure. topic and partition are loopvars, so we still want to pass them through as parameters to get a local copy for go 1.21, which would otherwise misbehave if the variables were accessed through closure.)

Also, this would let us pull the sess.waitGroup.Add(1) out of the function body, but still keep it spatially close to the paired Done(). We usually want to prefer adding to the waitgroup outside of the go func() because we cannot be sure at what time the new goroutine will run, and thus performing the Add in the goroutine means there is a race condition within which a wg.Wait() could fail to block properly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulling the anonymous go routine func out into a named one was mostly aesthetic / personal preference, but in this scenario where it was nested within a double for loop, I felt it made it easier to at a glance understand what the current code was doing (spinning off a blocking goroutine for each topic-partition) and decoupled what that goroutine was doing from the loop

In general I agree about incrementing waitGroups outside of the go routine. However, in this case we're only ever using the waitGroup.Wait() in cleanup to give any active consumers time to complete. If we haven't started the goroutine then the consumer won't be active and if it starts post-cleanup the sess context will already be done so consume won't proceed.

Copy link
Contributor

@puellanivis puellanivis Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see esthetics of removing indentions. I personally like to avoid assigning things that are only used once to temporary values because one of the two hard problems in computers is naming things.

For me, this kind of breaks up the code flow (one of the ideas for why golang uses goroutines rather than promises to get async behavior). While, sure, it is nice to remove the indention levels of the anonymous go func, it still only executes in one place, and that’s the for loop. Like we would recommend spitting the body of a for loop out if we weren’t starting a new goroutine with it right? So, it just feels weird. It just kind of feels a bit too clever.

But that said, it’s not wrong, so… 🤷‍♀️

As for the WaitGroup handling, I can see an argument for why in this particular instance it isn’t necessarily racey, I like to practice something I call “code hygiene”. Like, even if I know my hands aren’t dirty in this specific case before I start handling food, I still make myself wash them anyways, because it helps build and reinforce good habits. (Though, of course, always beware of good habits becoming unnecessary as the language makes them unnecessary. c.f. the loopvar guarding.)

P.S. In case it wasn’t perfectly clear. I have no objections to the code as is.

}
}
return sess, nil
Expand Down
Loading