2014-10-11 61 views
4
package main 

import (
"fmt" 
"github.com/streadway/amqp" 
"time" 
) 

// Every connection should declare the topology they expect 
func setup(url, queue string) (*amqp.Connection, *amqp.Channel, error) { 
    //setup connection 
    conn, err := amqp.Dial(url) 
    if err != nil { 
     return nil, nil, err 
    } 
    //build channel in the connection 
    ch, err := conn.Channel() 
     if err != nil { 
     return nil, nil, err 
    } 
    //queue declare 
    if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil { 
     return nil, nil, err 
    } 

    return conn, ch, nil 
} 

func main() { 
    //amqp url 
    url := "amqp://guest:[email protected]:5672"; 
    for i := 1; i <= 2; i++ { 
     fmt.Println("connect ", i) 
     //two goroutine 
     go func() { 
      //queue name 
      queue := fmt.Sprintf("example.reconnect.%d", i) 
      //setup channel in the tcp connection 
      _, pub, err := setup(url, queue) 
      if err != nil { 
       fmt.Println("err publisher setup:", err) 
       return 
      } 
      // Purge the queue from the publisher side to establish initial state 
      if _, err := pub.QueuePurge(queue, false); err != nil { 
       fmt.Println("err purge:", err) 
       return 
      } 
      //publish msg 
      if err := pub.Publish("", queue, false, false, amqp.Publishing{ 
       Body: []byte(fmt.Sprintf("%d", i)), 
      }); err != nil { 
       fmt.Println("err publish:", err) 
       return 
      } 
      //keep running 
      for{ 
       time.Sleep(time.Second * 20) 
      } 
     }() 
    } 
    //keep running 
    for { 
     time.Sleep(time.Second * 20) 
    } 
} 

我想只有一個程序和MQ-服​​務器之間的連接,一個連接可以在rabbitmq的go api中支持多個通道嗎?

但有兩個方面,一個連接只能支持一個通道,爲什麼呢?

不能這兩個goroutine共享相同的tcp連接嗎?

套接字描述符可以共享理論中的所有進程線程。

爲什麼兩個goroutine不共享一個套接字但有自己的通道?

手工模型:

model1

在RabbitMQ的真實模型: model2

回答

4

看着它看起來好像你可以調用conn.Channel()多次爲source for the library你喜歡它並通過同一連接創建一個新的通信流。

好吧,我試了一下,這裏的工作的例子......一個夠程,一個連接,兩個通道 我安裝的接收器,然後發送消息,然後從接收通道

,如果你想多讀隊列綁定在一個goroutine中,您可以調用rec.Consume兩次,然後在隊列中進行選擇。

package main 

import (
    "fmt" 
    "github.com/streadway/amqp" 
    "os" 
) 

func main() { 
    conn, err := amqp.Dial("amqp://localhost") 
    e(err) 
    defer conn.Close() 
    fmt.Println("Connected") 
    rec, err := conn.Channel() 
    e(err) 

    fmt.Println("Setup receiver") 
    rq, err := rec.QueueDeclare("go-test", false, false, false, false, nil) 
    e(err) 
    msgs, err := rec.Consume(rq.Name, "", true, false, false, false, nil) 
    e(err) 

    fmt.Println("Setup sender") 
    send, err := conn.Channel() 
    e(err) 
    sq, err := send.QueueDeclare("go-test", false, false, false, false, nil) 
    e(err) 

    fmt.Println("Send message") 
    err = send.Publish("", sq.Name, false, false, amqp.Publishing{ 
     ContentType: "text/plain", 
     Body:  []byte("This is a test"), 
    }) 
    e(err) 

    msg := <-msgs 
    fmt.Println("Received from:", rq, "msg:", string(msg.Body)) 
} 

func e(err error) { 
    if err != nil { 
     fmt.Println(err) 
     os.Exit(1) 
    } 
} 

輸出在我的盒子:在相同的goroutine如果

$ go run rmq.go 
Connected 
Setup receiver 
Setup sender 
Send message 
Received from: {go-test 0 0} msg: This is a test 
+0

一個連接。兩個連接,如果在兩個goroutine.I不知道爲什麼? @DavidB – 2014-10-15 19:30:21

+0

它看起來像連接和通道是線程安全的,你確定你不能使用來自多個goroutines的連接? (再一次,我沒有親自嘗試過) – 2014-10-16 01:33:36

+0

我想表達的是,有多少個goroutines調用了amqp.Dial()來決定tcp連接的數量,主要我想知道的是爲什麼這樣設計?例如只有主線程調用amqp.Dial(),所以有1個tcp連接。我的例子中有2個線程調用amqp.Dial(),所以有2個tcp連接。很抱歉,我沒有表達我的問題清楚嗎?但是您的示例顯示頻道可以共享一個連接。 @DavidB – 2014-10-16 11:25:42

相關問題