Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate GH-296 #297

Merged
merged 7 commits into from
Feb 7, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 108 additions & 24 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2205,6 +2205,114 @@ func TestDeliveryAckShouldReturnSpecificErrorOnClosedChannel(t *testing.T) {
}
}

// https://github.com/rabbitmq/amqp091-go/issues/11
func TestShouldNotWaitAfterConnectionClosedNewChannelCreatedIssue11(t *testing.T) {
conn := integrationConnection(t, "TestShouldNotWaitAfterConnectionClosedNewChannelCreatedIssue11")
ch, err := conn.Channel()
if err != nil {
t.Fatalf("channel error: %v", err)
}

conn.NotifyClose(make(chan *Error, 1))

_, err = ch.PublishWithDeferredConfirmWithContext(context.TODO(), "issue11", "issue11", false, false, Publishing{Body: []byte("abc")})
if err != nil {
t.Fatalf("PublishWithDeferredConfirm error: %v", err)
}

ch.Close()
conn.Close()

_, err = conn.Channel()
if err == nil {
t.Fatalf("Opening a channel from a closed connection should not block but returning an error %v", err)
}
}

// https://github.com/rabbitmq/amqp091-go/issues/296
func TestAckShouldNotCloseChannel_GH296(t *testing.T) {
// setup
const messageCount = 10
queueName := t.Name()
c, ch := integrationQueue(t, queueName)
defer ch.Close()
defer c.Close()

notifyConnClosed := make(chan *Error)
c.NotifyClose(notifyConnClosed)

notifyChannelClosed := make(chan *Error)
ch.NotifyClose(notifyChannelClosed)

for i := 0; i < messageCount; i++ {
err := ch.Publish(DefaultExchange, queueName, false, false, Publishing{
Body: []byte("this is a test"),
})
if err != nil {
t.Fatalf("publish error: %v", err)
}
}

err := ch.Qos(3, 0, false)
if err != nil {
t.Fatalf("Qos error: %v", err)
}

msgs, err := ch.Consume(
queueName, // queue
t.Name(), // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
t.Fatalf("Consume error: %v", err)
}

signal := make(chan int)
acks := make(chan *Delivery, messageCount)

go func() {
counter := 0
for {
select {
case msg := <-msgs:
go worker(acks, &msg)
case ack := <-acks:
ackError := ack.Ack(false)
if ackError != nil {
t.Logf("Ack error: %v", ackError)
}
counter = counter + 1
if counter >= messageCount {
signal <- counter
return
}
case <-time.After(500 * time.Millisecond):
t.Log("timed out waiting to do something")
}
}
}()

select {
case connError := <-notifyConnClosed:
t.Logf("saw connection closure error: %v", connError)
case channelError := <-notifyChannelClosed:
t.Logf("saw channel closure error: %v", channelError)
case count := <-signal:
t.Logf("saw %d messages", count)
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting to see %d messages", messageCount)
}
}

func worker(acks chan<- *Delivery, msg *Delivery) {
time.Sleep(time.Millisecond * time.Duration(msg.DeliveryTag) * 100)
acks <- msg
}

/*
* Support for integration tests
*/
Expand Down Expand Up @@ -2279,30 +2387,6 @@ func assertConsumeBody(t *testing.T, messages <-chan Delivery, want []byte) (msg
return msg
}

// https://github.com/rabbitmq/amqp091-go/issues/11
func TestShouldNotWaitAfterConnectionClosedNewChannelCreatedIssue11(t *testing.T) {
conn := integrationConnection(t, "TestShouldNotWaitAfterConnectionClosedNewChannelCreatedIssue11")
ch, err := conn.Channel()
if err != nil {
t.Fatalf("channel error: %v", err)
}

conn.NotifyClose(make(chan *Error, 1))

_, err = ch.PublishWithDeferredConfirmWithContext(context.TODO(), "issue11", "issue11", false, false, Publishing{Body: []byte("abc")})
if err != nil {
t.Fatalf("PublishWithDeferredConfirm error: %v", err)
}

ch.Close()
conn.Close()

_, err = conn.Channel()
if err == nil {
t.Fatalf("Opening a channel from a closed connection should not block but returning an error %v", err)
}
}

// Pulls out the CRC and verifies the remaining content against the CRC
func assertMessageCrc32(t *testing.T, msg []byte, assert string) {
t.Helper()
Expand Down
Loading