我正在嘗試在Go中編寫RabbitMQ Consumer。假設從隊列中一次取5個對象並處理它們。此外,假設成功處理其他人發送到死信隊列5次然後丟棄,它應該無限運行並處理消費者的取消事件。 我有幾個問題:Go中的RabbitMQ消費者
- 是否存在的
BasicConsumer
VS在EventingBasicConsumer
任何概念的RabbitMQ,去Reference? - RabbitMQ中的
Model
是什麼?它在RabbitMq-go中嗎? - 如何時,沒有死信隊列後再次
ttl
- 重新排隊他們什麼是
consumerTag
說法在ch.Consume
功能在下面的代碼的意義 - 我們應該用發送對象
channel.Get()
或channel.Consume()
在這種情況下?
爲了滿足上述要求,我需要在下面的代碼中做出什麼樣的改變。我問這是因爲我找不到像樣的RabbitMq-Go文檔。
func main() {
consumer()
}
func consumer() {
objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}
initializeConn(&objConsumerConn.conn)
ch, err := objConsumerConn.conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
msgs, err := ch.Consume(
objConsumerConn.queueName, // queue
"demo1", // consumerTag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
k := new(EventCaptureData)
b := bytes.Buffer{}
b.Write(d.Body)
dec := gob.NewDecoder(&b)
err := dec.Decode(&k)
d.Ack(true)
if err != nil { fmt.Println("failed to fetch the data from consumer", err); }
fmt.Println(k)
}
}()
log.Printf(" Waiting for Messages to process. To exit press CTRL+C ")
<-forever
}
被修改的問題:
我已延遲的消息的處理如在鏈接link1link2建議。但問題是即使在ttl之後,消息也會從死信隊列中恢復到原始隊列。我正在使用RabbitMQ 3.0.0
。任何人都可以指出什麼是問題?
嘗試AMQP包與兔子互動,也有一個非常體面的文件https://godoc.org/github.com/streadway/amqp – PerroVerd
@PerroVerd這是我在用。 – Naresh