-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(consumer): add recovery from no leader partitions #3101
Open
liutao365
wants to merge
3
commits into
IBM:main
Choose a base branch
from
liutao365:main
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+43
−13
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, this function is now used exclusively here, and only here. So, assigning it to a local variable doesn’t seem all that necessary. Instead, we could just use:
(Also, there’s no reason to pass
sess
through, since it’s already available by closure.topic
andpartition
are loopvars, so we still want to pass them through as parameters to get a local copy for go 1.21, which would otherwise misbehave if the variables were accessed through closure.)Also, this would let us pull the
sess.waitGroup.Add(1)
out of the function body, but still keep it spatially close to the pairedDone()
. We usually want to prefer adding to the waitgroup outside of thego func()
because we cannot be sure at what time the new goroutine will run, and thus performing theAdd
in the goroutine means there is a race condition within which awg.Wait()
could fail to block properly.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pulling the anonymous go routine func out into a named one was mostly aesthetic / personal preference, but in this scenario where it was nested within a double for loop, I felt it made it easier to at a glance understand what the current code was doing (spinning off a blocking goroutine for each topic-partition) and decoupled what that goroutine was doing from the loop
In general I agree about incrementing waitGroups outside of the go routine. However, in this case we're only ever using the waitGroup.Wait() in cleanup to give any active consumers time to complete. If we haven't started the goroutine then the consumer won't be active and if it starts post-cleanup the sess context will already be done so consume won't proceed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see esthetics of removing indentions. I personally like to avoid assigning things that are only used once to temporary values because one of the two hard problems in computers is naming things.
For me, this kind of breaks up the code flow (one of the ideas for why golang uses goroutines rather than promises to get async behavior). While, sure, it is nice to remove the indention levels of the anonymous go func, it still only executes in one place, and that’s the for loop. Like we would recommend spitting the body of a for loop out if we weren’t starting a new goroutine with it right? So, it just feels weird. It just kind of feels a bit too clever.
But that said, it’s not wrong, so… 🤷♀️
As for the
WaitGroup
handling, I can see an argument for why in this particular instance it isn’t necessarily racey, I like to practice something I call “code hygiene”. Like, even if I know my hands aren’t dirty in this specific case before I start handling food, I still make myself wash them anyways, because it helps build and reinforce good habits. (Though, of course, always beware of good habits becoming unnecessary as the language makes them unnecessary. c.f. the loopvar guarding.)P.S. In case it wasn’t perfectly clear. I have no objections to the code as is.