2017-04-22 29 views
1

我想以下情形使用AMQP保持通過<code>amqp</code>一個長期連接到遠程工作者

服務器調度「開始」行動工作進程模型,具體如下(假設channelaction是以前提供和行動爲START一些有效載荷。)

channel.assertQueue('', { exclusive: true }).then(({ queue }) => { 
    const cId = uuid() 

    channel.consume(queue, (msg) => { 
    if (msg.properties.correlationId === cId) { 
     const response = JSON.parse(msg.content.toString()) 
     console.log('response', response) 
     resolve(response) 
    } 
    }, { noAck: true }) 

    const msg = JSON.stringify(action) 
    channel.sendToQueue(
    QUEUE_NAME, 
    new Buffer(msg), 
    { correlationId: cId, replyTo: queue } 
) 
}, reject) 

工人得到START actioncorrelationIdreplyTo隊列名稱一起,增加了有效載荷的事情要做自己的內部列表,並響應到「0123_S」隊列並執行「S​​TART_SUCCESS」操作。

現在,工作人員將通過其內部要做的事情列表並執行它們,並通過相同的replyTo隊列向服務器發出「更新」操作,因此服務器需要知道要繼續收聽到該隊列進行更新,並且需要知道哪個工作人員正在處理任何特定任務的更新。服務器足夠聰明,可以知道某個特定任務已經啓動,因此在這種情況下不會重新分派。

但是,當它的時間爲工人停止做任務,需要服務器知道要發送一個「STOP」消息給工人。有沒有辦法讓工作人員向服務器發送某種直接的amqp通道給服務器,以便服務器可以使用它發送STOP消息?

回答

1

最簡單的答案似乎是讓工作人員創建一個「回覆」隊列,然後在「START_SUCCESS」消息中將該標識符發送到服務器,並將該標識符發送到位於某處的服務器存儲區。

不過,我覺得很多的RabbitMQ的力量來自於一個事實,即消息不直接發佈到隊列,而是交流,他們的最終目標是通過自己的路由鍵確定。 (按隊列名稱發佈實際上是通過使用路由密鑰作爲隊列名稱的交換機。)如果您不熟悉不同類型的交換,請閱讀the RabbitMQ Getting Started tutorials

在這種情況下,不要想到服務器和工作人員需要知道對方的身份,你可以考慮發佈和訂閱對方的更新。如果一切都發布到交易所,那麼服務器和工作人員實際上並不需要知道任何關於彼此身份的信息。

以下是我看到的工作:

  1. 服務器生成特定作業的唯一ID。
  2. 服務器發佈一個開始消息到交換jobs.new,與所述消息中的路由密鑰進行分類的作業的類型,和作業ID。
  3. 服務器將綁定密鑰設置爲作業ID,將匿名隊列綁定到直接或主題交換jobs.status
  4. 工人啓動並從jobs.ready(或jobs.ready.some_type)接收一條消息。
  5. 工作人員將一個匿名隊列綁定到jobs.control交換,作業ID爲綁定密鑰。
  6. 工作人員啓動任務,並將START_SUCCESS消息發佈到交換jobs.status,並將作業ID作爲路由密鑰。
  7. 服務器接收從它結合在步驟3中的隊列中的消息START_SUCCESS,並更新其狀態該作業。
  8. 工人週期性地發送更新消息到交換jobs.status;再次,路由密鑰與作業ID相匹配,因此服務器會收到該消息。
  9. 當服務器要停止(或修改)正在運行的作業,它發佈一個STOP消息與作業ID作爲路由鍵jobs.control交換。
  10. 工作人員在步驟5中綁定的隊列上接收到此消息,並停止作業。

從RabbitMQ的側面看,你有這些元素:

  • 3交流:
    • jobs.new其中服務器發佈新的就業機會。如果所有工作人員都可以處理所有工作,這可能是一個簡單的扇出交換,或者它可能是一個話題交換,它將其路由到不同類型的工作人員的不同工作隊列中。
    • jobs.status其中更新由工人出版。這將是直接或主題交換,其路由鍵是或包含作業ID。
    • jobs.control其中更新由服務器發佈到控制現有就業崗位。再次,這將是直接或主題交換,其路由鍵是或包含作業ID。
  • 永久隊列:
    • 單個jobs.ready隊列,或不同jobs.ready.some_type隊列,結合到jobs.new交換。
  • 匿名隊列: 每個服務器創建和綁定使用該作業的ID的jobs.status交換工作
    • 一個隊列。另外,服務器進程可以有一個入站流量隊列,只需從接收到的消息中讀取作業ID即可。每由工人創建的工作
    • 一個隊列,並綁定到使用它當前正在處理的作業的ID的jobs.control交換。

注意,您可以附加額外的隊列來這些交流,以獲得流量,例如副本用於記錄或調試。對於主題交換,只需綁定額外的隊列和綁定密鑰#,它將獲得所有消息的副本,而不會中斷任何現有的綁定。

+0

非常感謝。這是一個很棒的答案。 –

相關問題