Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions internal/pipeline/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
// channel. Inputs remain coupled to their outputs as they propagate the
// response channel in the transaction.
type Pool struct {
workers []processor.Pipeline
workers []processor.Pipeline
msgProcessors []processor.V1

log log.Modular

Expand All @@ -37,14 +38,17 @@ func NewPool(threads int, log log.Modular, msgProcessors ...processor.V1) (*Pool
}

p := &Pool{
workers: make([]processor.Pipeline, threads),
log: log,
messagesOut: make(chan message.Transaction),
shutSig: shutdown.NewSignaller(),
workers: make([]processor.Pipeline, threads),
msgProcessors: msgProcessors,
log: log,
messagesOut: make(chan message.Transaction),
shutSig: shutdown.NewSignaller(),
}

for i := range p.workers {
p.workers[i] = NewProcessor(msgProcessors...)
proc := NewProcessor(msgProcessors...)
proc.noCloseProcs = true
p.workers[i] = proc
}

return p, nil
Expand All @@ -68,6 +72,12 @@ func (p *Pool) loop() {
}
}

for _, c := range p.msgProcessors {
if err := c.Close(closeNowCtx); err != nil {
break
}
}

close(p.messagesOut)
p.shutSig.TriggerHasStopped()
}()
Expand Down
91 changes: 91 additions & 0 deletions internal/pipeline/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package pipeline_test
import (
"context"
"reflect"
"sync"
"testing"
"time"

Expand All @@ -20,6 +21,25 @@ import (
_ "github.com/redpanda-data/benthos/v4/internal/impl/pure"
)

// blockingProcessor blocks ProcessBatch until blockCh is closed, then passes
// messages through unchanged. closeFn is called on Close.
type blockingProcessor struct {
blockCh chan struct{}
closeFn func()
}

func (b *blockingProcessor) ProcessBatch(_ context.Context, msg message.Batch) ([]message.Batch, error) {
<-b.blockCh
return []message.Batch{msg}, nil
}

func (b *blockingProcessor) Close(_ context.Context) error {
if b.closeFn != nil {
b.closeFn()
}
return nil
}

func TestPoolBasic(t *testing.T) {
ctx, done := context.WithTimeout(t.Context(), time.Second*30)
defer done()
Expand Down Expand Up @@ -262,6 +282,77 @@ func TestPoolMultiThreads(t *testing.T) {
require.NoError(t, proc.WaitForClose(ctx))
}

func TestPoolProcessorsNotClosedEarly(t *testing.T) {
ctx, done := context.WithTimeout(t.Context(), time.Second*30)
defer done()

var (
mu sync.Mutex
closed bool
closeCh = make(chan struct{})
blockCh = make(chan struct{})
)

proc := &blockingProcessor{
blockCh: blockCh,
closeFn: func() {
mu.Lock()
defer mu.Unlock()
closed = true
close(closeCh)
},
}

pool, err := pipeline.NewPool(2, log.Noop(), proc)
require.NoError(t, err)

tChan := make(chan message.Transaction)
resChan := make(chan error, 1)

require.NoError(t, pool.Consume(tChan))

// Send one message — one worker picks it up and blocks, the other will
// see the channel close and exit.
select {
case tChan <- message.NewTransaction(message.QuickBatch([][]byte{[]byte("hello")}), resChan):
case <-ctx.Done():
t.Fatal("timed out sending message")
}

// Close the input channel. One worker exits, but the other is blocked.
close(tChan)

// Give the exiting worker time to run its defer. If the bug were present,
// it would call Close() on the shared processor here.
time.Sleep(50 * time.Millisecond)

mu.Lock()
closedEarly := closed
mu.Unlock()
require.False(t, closedEarly, "processor was closed while another worker was still processing")

// Unblock the processing worker.
close(blockCh)

// Drain the output.
select {
case procT := <-pool.TransactionChan():
require.NoError(t, procT.Ack(ctx, nil))
case <-ctx.Done():
t.Fatal("timed out reading output")
}

// Wait for close — this is where the pool should close processors.
require.NoError(t, pool.WaitForClose(ctx))

// Now the processor should be closed.
select {
case <-closeCh:
case <-ctx.Done():
t.Fatal("processor was never closed")
}
}

func TestPoolMultiNaturalClose(t *testing.T) {
conf := pipeline.NewConfig()
conf.Threads = 2
Expand Down
10 changes: 6 additions & 4 deletions internal/pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
// The processor will read from a source, perform some processing, and then
// either propagate a new message or drop it.
type Processor struct {
noCloseProcs bool
msgProcessors []processor.V1

messagesOut chan message.Transaction
Expand Down Expand Up @@ -46,10 +47,11 @@ func (p *Processor) loop() {
defer cnDone()

defer func() {
// Signal all children to close.
for _, c := range p.msgProcessors {
if err := c.Close(closeNowCtx); err != nil {
break
if !p.noCloseProcs {
for _, c := range p.msgProcessors {
if err := c.Close(closeNowCtx); err != nil {
break
}
}
}

Expand Down
Loading