2016-04-05 218 views
1

我正在嘗試在Go中編寫RabbitMQ Consumer。假設從隊列中一次取5個對象並處理它們。此外,假設成功處理其他人發送到死信隊列5次然後丟棄,它應該無限運行並處理消費者的取消事件。 我有幾個問題:Go中的RabbitMQ消費者

  1. 是否存在的BasicConsumer VS在EventingBasicConsumer任何概念的RabbitMQ,去Reference
  2. RabbitMQ中的Model是什麼?它在RabbitMq-go中嗎?
  3. 如何時,沒有死信隊列後再次ttl
  4. 重新排隊他們什麼是consumerTag說法在ch.Consume功能在下面的代碼的意義
  5. 我們應該用發送對象channel.Get()channel.Consume()在這種情況下?

爲了滿足上述要求,我需要在下面的代碼中做出什麼樣的改變。我問這是因爲我找不到像樣的RabbitMq-Go文檔。

func main() { 

     consumer()   
    } 

    func consumer() { 

     objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}  
     initializeConn(&objConsumerConn.conn) 


     ch, err := objConsumerConn.conn.Channel() 
     failOnError(err, "Failed to open a channel") 
     defer ch.Close() 

     msgs, err := ch.Consume(
       objConsumerConn.queueName, // queue 
       "demo1",  // consumerTag 
       false, // auto-ack 
       false, // exclusive 
       false, // no-local 
       false, // no-wait 
       nil, // args 
     ) 
     failOnError(err, "Failed to register a consumer") 

     forever := make(chan bool) 

     go func() { 
      for d := range msgs {     
       k := new(EventCaptureData) 
       b := bytes.Buffer{} 
       b.Write(d.Body) 
       dec := gob.NewDecoder(&b) 
       err := dec.Decode(&k) 
       d.Ack(true) 

       if err != nil { fmt.Println("failed to fetch the data from consumer", err); } 
        fmt.Println(k)       
      } 
     }()  

     log.Printf(" Waiting for Messages to process. To exit press CTRL+C ") 
     <-forever 

    } 

被修改的問題:

我已延遲的消息的處理如在鏈接link1link2建議。但問題是即使在ttl之後,消息也會從死信隊列中恢復到原始隊列。我正在使用RabbitMQ 3.0.0。任何人都可以指出什麼是問題?

+0

嘗試AMQP包與兔子互動,也有一個非常體面的文件https://godoc.org/github.com/streadway/amqp – PerroVerd

+0

@PerroVerd這是我在用。 – Naresh

回答

1

在 RabbitMq-go Reference中是否有BasicConsumer vs EventingBasicConsumer的概念?

不完全是,但Channel.GetChannel.Consume調用服務類似的概念。使用Channel.Get時,如果有任何可用信息,您將獲得第一條消息的非阻塞呼叫,或返回ok=false。通過Channel.Consume排隊的消息被傳遞到一個通道。

什麼是RabbitMQ中的模型,它在RabbitMq-go中有嗎?

如果你指的是在C#中的RabbitMQ的IModelConnection.CreateModel,這是從C#lib中的東西,而不是從自己的RabbitMQ。這只是一個試圖從RabbitMQ「Channel」術語中抽象出來的東西,但它從來沒有出現過。

如何時,沒有死信隊列和TTL後再次 重新排隊它們

使用delivery.Nackrequeue=false發送的對象。

消費者標籤參數在ch中的意義是什麼?在下面的代碼中使用 函數

ConsumerTag只是一個消費者標識符。它可用於取消頻道channel.Cancel,並識別負責傳送的消費者。與channel.Consume一起發送的所有消息都將設置爲ConsumerTag字段。

我們應該在這種情況下使用channel.Get()channel.Consume()嗎?

我認爲channel.Get()幾乎從來沒有比channel.Consume()好。有了channel.Get,你會輪詢隊列並浪費CPU什麼也不做,這在Go中沒有意義。

我需要在下面的代碼中做出什麼修改才能滿足 以上的要求。

  1. 既然你在一個時間是批量處理5,你可以從消費渠道接收,一旦它得到了5個交貨調用另一個函數來處理他們的goroutine。

  2. 要確認或發送到死信隊列,您將使用delivery.Ackdelivery.Nack函數。您可以使用multiple=true併爲批次調用一次。一旦消息進入死信隊列,您必須檢查delivery.Headers["x-death"]標題,瞭解其已死信的次數,並在已經重試5次時呼叫delivery.Reject

  3. 使用channel.NotifyCancel來處理取消事件。

+0

非常感謝你的詳細解釋。 – Naresh

+0

@佩德羅..我有一個問題。如果我使用d.Ack(true)或d.Ack(false),它不會將消息發佈在死信隊列中。在d.Nack(true,false)的情況下發布。但是在ttl之後它會從那裏丟棄消息。那麼,實現相同 – Naresh

+0

我編輯過的問題的價值是什麼。你可以看看它嗎 – Naresh