@@ -2271,24 +2271,23 @@ func TestAckShouldNotCloseChannel_GH296(t *testing.T) {
2271
2271
t .Fatalf ("Consume error: %v" , err )
2272
2272
}
2273
2273
2274
- signal := make (chan bool )
2274
+ signal := make (chan int )
2275
2275
acks := make (chan * Delivery , messageCount )
2276
2276
2277
2277
go func () {
2278
2278
counter := 0
2279
2279
for ;; {
2280
2280
select {
2281
2281
case msg := <- msgs :
2282
- t .Logf ("starting worker for message: %d" , msg .DeliveryTag )
2283
- go worker (t , acks , & msg )
2282
+ go worker (acks , & msg )
2284
2283
case ack := <- acks :
2285
2284
ackError := ack .Ack (false )
2286
2285
if ackError != nil {
2287
2286
t .Logf ("Ack error: %v" , ackError )
2288
2287
}
2289
2288
counter = counter + 1
2290
2289
if counter >= messageCount {
2291
- signal <- true
2290
+ signal <- counter
2292
2291
return
2293
2292
}
2294
2293
case <- time .After (500 * time .Millisecond ):
@@ -2302,18 +2301,16 @@ func TestAckShouldNotCloseChannel_GH296(t *testing.T) {
2302
2301
t .Logf ("saw connection closure error: %v" , connError )
2303
2302
case channelError := <- notifyChannelClosed :
2304
2303
t .Logf ("saw channel closure error: %v" , channelError )
2305
- case <- signal :
2306
- t .Logf ("saw %d messages" , messageCount )
2304
+ case count := <- signal :
2305
+ t .Logf ("saw %d messages" , count )
2307
2306
case <- time .After (5 * time .Second ):
2308
2307
t .Fatalf ("timed out waiting to see %d messages" , messageCount )
2309
2308
}
2310
2309
}
2311
2310
2312
- func worker (t * testing.T , acks chan <- * Delivery , msg * Delivery ) {
2313
- t .Logf ("worker processing message: %d" , msg .DeliveryTag )
2311
+ func worker (acks chan <- * Delivery , msg * Delivery ) {
2314
2312
time .Sleep (time .Millisecond * time .Duration (msg .DeliveryTag ) * 100 )
2315
2313
acks <- msg
2316
- t .Logf ("worker done processing message: %d" , msg .DeliveryTag )
2317
2314
}
2318
2315
2319
2316
/*
0 commit comments