2017-02-23 97 views
1

我正在Hapi框架中開發node.js服務器。我採用了RabbitMQ(amqp)來排列我的任務。但是,一旦發送請求,而不是立即回覆請求,則會將消息發送到實際功能作爲使用者所在的Rabbit服務器。然後,用戶應該返回結果給(請求,回覆)函數並讓函數回覆它。Node.js在腳本之間傳輸數據

現在我的解決方案是在我的工作文件(其中amqp使用者所在位置)中創建一個變量並將其導出。然後在索引文件(我的主要腳本與路由處理程序)中,我導入變量。一旦收到一些請求,它將發送一條消息給RabbitMQ服務器,服務器將更改該變量。然後,回到索引文件,腳本更新變量的值,然後回覆它。顯然,由於異步,程序會回覆先前請求的結果。

我做了一些研究,發現我們不應該在腳本之間共享一些變量。有沒有人有辦法解決嗎?我的目標是可以將我的amqp用戶放在腳本中。一旦我運行腳本,消費者將準備好接收任何相應的消息。然後在我的索引文件中,一旦收到一個請求,就會向RabbitMQ服務器發送一條消息。然後它應該抓住消費者的結果並回復它。

下面是我的代碼:

index.ts

import * as Joi from "joi"; 
import * as amqp from "amqplib/callback_api"; 
import * as waitUntil from "wait-until"; 

import * as repository from "./repository"; 
import * as worker from "./worker"; 

// defien variables from internal modules 
let greeter = new repository.Greeter(); 

// register type 
import {Register} from "../../interfaces"; 

// define amqp related stuff 
let greeterReply = worker.greeterReply; 

// helloWorld config including handler, validate and auth 
export let register: Register = (server, options, next) => { 
    server.route([ 
    { 
     method: "GET", 
     path: "/greeter", 
     config: { 
      handler: (request, reply) => { 
       let q: string = "greeter"; 
       let requestQuery = request.query; 
       let requestString = JSON.stringify(requestQuery); 
       amqp.connect("amqp://192.168.0.31", (err, conn) => { 
        conn.createChannel((err, ch) => { 
         ch.assertQueue(q, {durable: false}); 
         ch.sendToQueue(q, new Buffer(requestString)); 
        }); 
       }); 
       waitUntil(500, 10, function condition() { 
        greeterReply = worker.greeterReply; 
        return (greeterReply !== null); 
       }, function done(result) { 
        reply(greeterReply); 
        greeterReply = null; 
       }); 
      }, 
      validate: { 
       query: { 
        name: Joi.string(), 
        age: Joi.number() 
       } 
      }, 
     } 
    } 
    ]); 
    next(); 
}; 

register.attributes = { 
    name: "greeter", 
    version: "1.0" 
}; 

worker.ts

// import external modules 
import * as amqp from "amqplib/callback_api"; 

// import internal modules 
import * as repository from "./repository"; 
import * as indexModule from "./index"; 

// defien variables from internal modules 
let greeter = new repository.Greeter(); 

export let greeterReply = null; 

amqp.connect("amqp://192.168.0.31", (err, conn) => { 
    conn.createChannel((err, ch) => { 
     let q: string = "greeter"; 
     ch.assertQueue(q, {durable: false}); 
     ch.consume(q, function (requestString) { 
      let newRequest = JSON.parse(requestString.content.toString()); 
      console.log("replied via amqp"); 
      let result: string = "how are you"; 
      result = greeter.helloWorld(newRequest.name, newRequest.age); 
      console.log("the result is: ", result); 
      greeterReply = result; 
     }, {noAck: true}); 
    }); 
}); 
+0

看看插件來封裝這種類型的東西。還可以在工作者函數中使用回調來觸發和控制處理。通過使用類可以封裝每個請求的狀態,但是如果您打算獲得高吞吐量,這最終可能會成爲問題。 –

+0

如果我使用回調函數,我必須調用索引文件中的函數嗎?這不是我打算做的。我有很多路線可以處理。你能詳細解釋插件方法嗎?我如何將amqp註冊爲插件? – zhangjinzhou

回答

1

你所需要的是一種RPC通過的RabbitMQ的。它由RabbitMQ支持,如tutorials here所示。

您可以使用amqplib自己實現它,您可以使用它,也可以使用特定模塊(如amqp-rpc)爲您完成。

+0

我相信這就是解決方案!比你哥們。但是,如果我沒有使用RPC,我注意到我的程序(上面的代碼)工作正常,如果我不在終端中運行worker。看起來消費者在發佈者發送消息時作爲子進程運行。你知道原因嗎? @tbking – zhangjinzhou

+0

這是因爲你已經導入了工作文件,這意味着你只需要運行索引文件,它就會運行worker文件並調用你的worker。 你的代碼是這樣的,你不需要單獨運行這些文件。 – tbking

+0

謝謝!在這種情況下,你認爲我仍然需要使用RPC嗎?如果是這樣,我可以得到什麼好處? – zhangjinzhou