Skip to content

pipeline: defer processor close in pool until all workers exit#393

Merged
mmatczuk merged 3 commits intomainfrom
connect-1655
Mar 19, 2026
Merged

pipeline: defer processor close in pool until all workers exit#393
mmatczuk merged 3 commits intomainfrom
connect-1655

Conversation

@twmb
Copy link
Copy Markdown
Contributor

@twmb twmb commented Mar 16, 2026

When pipeline.threads > 1, all worker goroutines share the same processor instances. Previously, when any worker's input channel closed, its loop() defer would call Close() on the shared processors — even while other workers were still mid-processing. This caused errors like "sql: database is closed".

Add a noCloseProcs flag to Processor so that pool workers skip closing processors in their defer. Instead, Pool.loop() closes the shared processors after all workers have exited.

Fixes redpanda-data/connect#1655

@claude
Copy link
Copy Markdown

claude bot commented Mar 19, 2026

Commits
LGTM

Review
The fix correctly prevents early closure of shared processors when a pool worker exits before others finish. The Pool now owns processor lifecycle, waiting for all workers via WaitForClose before closing processors. Test coverage validates the race condition scenario.

LGTM

@mmatczuk mmatczuk merged commit e575a36 into main Mar 19, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Block shutdown if processors are still doing work

2 participants