Skip to content

Commit f907fbc

Browse files
committed
queue: detect close of incoming channel
1 parent 278915e commit f907fbc

File tree

2 files changed

+47
-2
lines changed

2 files changed

+47
-2
lines changed

queue/queue.go

+25-2
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,18 @@ func (cq *ConcurrentQueue) start() {
5858
go func() {
5959
defer cq.wg.Done()
6060

61+
readLoop:
6162
for {
6263
nextElement := cq.overflow.Front()
6364
if nextElement == nil {
6465
// Overflow queue is empty so incoming items can be pushed
6566
// directly to the output channel. If output channel is full
6667
// though, push to overflow.
6768
select {
68-
case item := <-cq.chanIn:
69+
case item, ok := <-cq.chanIn:
70+
if !ok {
71+
break readLoop
72+
}
6973
select {
7074
case cq.chanOut <- item:
7175
// Optimistically push directly to chanOut
@@ -79,7 +83,10 @@ func (cq *ConcurrentQueue) start() {
7983
// Overflow queue is not empty, so any new items get pushed to
8084
// the back to preserve order.
8185
select {
82-
case item := <-cq.chanIn:
86+
case item, ok := <-cq.chanIn:
87+
if !ok {
88+
break readLoop
89+
}
8390
cq.overflow.PushBack(item)
8491
case cq.chanOut <- nextElement.Value:
8592
cq.overflow.Remove(nextElement)
@@ -88,6 +95,22 @@ func (cq *ConcurrentQueue) start() {
8895
}
8996
}
9097
}
98+
99+
// Incoming channel has been closed. Empty overflow queue into
100+
// the outgoing channel.
101+
nextElement := cq.overflow.Front()
102+
for nextElement != nil {
103+
select {
104+
case cq.chanOut <- nextElement.Value:
105+
cq.overflow.Remove(nextElement)
106+
case <-cq.quit:
107+
return
108+
}
109+
nextElement = cq.overflow.Front()
110+
}
111+
112+
// Close outgoing channel.
113+
close(cq.chanOut)
91114
}()
92115
}
93116

queue/queue_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,25 @@ func TestConcurrentQueueIdempotentStop(t *testing.T) {
6363

6464
testQueueAddDrain(t, 100, 1, 10, 1000, 1000)
6565
}
66+
67+
// TestQueueCloseIncoming tests that the queue properly handles an incoming
68+
// channel that is closed.
69+
func TestQueueCloseIncoming(t *testing.T) {
70+
t.Parallel()
71+
72+
queue := queue.NewConcurrentQueue(10)
73+
queue.Start()
74+
75+
queue.ChanIn() <- 1
76+
close(queue.ChanIn())
77+
78+
item := <-queue.ChanOut()
79+
if item.(int) != 1 {
80+
t.Fatalf("unexpected item")
81+
}
82+
83+
_, ok := <-queue.ChanOut()
84+
if ok {
85+
t.Fatalf("expected outgoing channel being closed")
86+
}
87+
}

0 commit comments

Comments
 (0)