amqp: Channel.Publish blocks indefinitely
Channel.Publish sometimes blocks indefinitely. The following code is a test case to reproduce this. It reproduces most of time I run it. Adding a sleep between inside the for loop does not help.
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
const queueName = "q1"
if conn, stdErr := amqp.Dial("amqp://guest:guest@localhost:5672/"); stdErr != nil {
log.Fatalln(stdErr)
} else if channel, stdErr := conn.Channel(); stdErr != nil {
log.Fatalln(stdErr)
} else if stdErr := channel.Confirm(false); stdErr != nil {
log.Fatalln(stdErr)
} else if _, stdErr := channel.QueueDeclare(
queueName, //queue
true, //durable
false, //autodelete
false, //exclusive
false, //nowait
nil, //arguments
); stdErr != nil {
log.Fatalln(stdErr)
} else {
confirmationChan := channel.NotifyPublish(make(chan amqp.Confirmation))
for i := 0; i < 10000; i++ {
log.Println("publish ", i)
if stdErr := channel.Publish(
"", // exchange
queueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Body: []byte{},
}); stdErr != nil {
log.Fatalln(stdErr)
} else {
log.Println("pre confirmation check")
confirmation := <-confirmationChan
log.Println("post confirmation check")
if !confirmation.Ack {
log.Fatalln("confirmation not ack")
}
}
// time.Sleep(time.Millisecond * 1000)
}
}
}
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Comments: 23
Commits related to this issue
- fix #254 Channel.Publish blocks indefinitely — committed to sljeff/amqp by sljeff 3 years ago
- Merge pull request #1 from sljeff/fix-254 fix #254 Channel.Publish blocks indefinitely — committed to shanbay/amqp by fenngwd 3 years ago
- Merge pull request #1 from sljeff/fix-254 fix #254 Channel.Publish blocks indefinitely — committed to jraylan/amqp by jraylan a year ago
Found the problem!
In my test case, this
confirmationChan := channel.NotifyPublish(make(chan amqp.Confirmation))should beconfirmationChan := channel.NotifyPublish(make(chan amqp.Confirmation, 1))Documentation says
Seems like that the deadlock I have been hitting. I don’t understand what other operations are being performed that are causing the deadlock, but making the channel buffered did fix my problem.
@michaelklishin, thanks for helping out. I found this while comparing my code to the producer example.