2012-10-04 129 views
5

我有一個小型的Clojure消費者/發佈者接收消息,處理消息並通過RabbitMQ將消息發送給其他消費者。Clojure消息處理/異步,多線程

我已經定義了一個消息處理程序,它處理單獨線程中的消息(與主線程分離)。 從下面的代碼可以看出,線程同步接收和發送消息,所有這些都發生在由功能啓動的事件循環中。

所以,問題是,什麼是「Clojure方法」來創建這些同步消息處理程序的N大小的線程池?我猜非Clojure的方式是通過Java interop手動產生多個線程。

另外,考慮到處理不是CPU密集型的,這會加快消息處理的速度嗎?考慮到發佈花費的時間多於處理花費的時間,再次將這些消息處理程序設置爲異步會更好嗎?

最後,我將如何去測量這些競爭方法的性能(我來自Ruby/Javascript世界,並且那裏沒有多線程)?

注意: 我知道這一切可以通過只是水平縮放和產卵多JVM進程聽完消息總線是可以避免的,但由於應用程序將被部署在Heroku上,我想作爲使用每個動態/流程中儘可能多的資源。

(defn message-handler 
    [ch metadata ^bytes payload] 
    (let [msg (json/parse-string (String. payload "UTF-8")) 
     processed-message (process msg)] 
    (lb/publish ch "e.events" "" processed-message))) 

(defn -main 
    [& args] 
    (let [conn   (rmq/connect {:uri (System/getenv "MSGQ")}) 
     ch   (lch/open conn) 
     q-name  "q.events.tagger" 
     e-sub-name "e.events.preproc" 
     e-pub-name "e.events" 
     routing-key "tasks.taggify"] 
    (lq/declare ch q-name :exclusive false :auto-delete false) 
    (le/declare ch e-pub-name "fanout" :durable false) 
    (lq/bind ch q-name e-sub-name :routing-key routing-key) 
    (.start (Thread. (fn [] 
         (lcm/subscribe ch q-name message-handler :auto-ack true)))))) 

在一個更基本的音符......我將如何去重構這個代碼,以支持一個額外的參數註冊消息處理程序回調,像這樣:

(.start (Thread. (fn [] 
         (lcm/subscribe ch q-name (message-handler pub-name) :auto-ack true)))))) 

和然後用參考發佈:

(lb/publish ch pub-name "" processed-message))) 

來代替字面:

(lb/publish ch "e.events" "" processed-message))) 

回答

2

對於問題的第二部分時,可以使用局部應用,如下所示:

(defn message-handler 
    [pub-name ch metadata ^bytes payload] 
    (let [msg (json/parse-string (String. payload "UTF-8")) 
     processed-message (process msg)] 
    (lb/publish ch pub-name "" processed-message))) 



(.start 
    (Thread. 
    (fn [] 
     (lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true)))))) 
1

這是一個非常大的話題,你可能會考慮把這個問題分解成幾個不同的問題,但簡潔的答案是:use agents

+0

感謝您的提示,就行了。 – neektza