2014-10-01 42 views
1

我正在實現一個線程的機制,讓它有一個包含消息的隊列。該隊列使用java.util.concurrentLinkedBlockingQueue構建。我想要達到的是如下所示。Clojure等待沒有旋轉的條件

Thread with mailbox: 
defn work: 
    * do some stuff 
    * Get the head of the queue (a message): 
     - if it is "hello": 
      <do some stuff> 
      <recur work fn> 
     - if it is "bye": 
      <do some stuff> 
     - if it is none of the above, add the message to the back of queue 
      and restart from "Get the head of the queue" 
    * <reaching this point implies terminating the thread> 

,我試圖用的是* Get the head of the queue纏循環,使用條件檢查郵件,並在:else分支添加到隊列中,如果沒有匹配的條款來實現我最初的想法。其缺點是在cond的條款的任何主體中呼叫recur將總是重複該循環,而使用recur(例如,如在hello的情況下)意味着重複該功能(即,work)。所以這不是一個選擇。另一個缺點是,如果這種信息需要很長時間才能到達,線程將無限期地旋轉並吃掉資源。

我有(但尚未實施)的下一個想法是使用未來。該計劃如下。

* Get all the matches I have to match (i.e., "hello" and "bye") 
* Start a future and pass it the list of messages: 
    * While the queue does not contain any of the messages 
     recur 
    * when found, return the first element that matches. 
* Wait for the future to deliver. 
* if it is "hello": 
    <do some stuff> 
    <recur work fn> 
    if it is "bye": 
    <do some stuff> 

在做這種方式,我得到幾乎我想要的東西:

  1. 在接受"hello""bye"塊,直到我有一張。
  2. 我可以讓條款無限期數量相匹配的消息
  3. 我已經提取的循環行爲成future阻止,這 有很好的副作用,每次我評價我cond 我敢肯定我有一個匹配的消息,不必擔心重試。

我真正想要的一件事,但無法想象如何實現,就是在這種情況下的未來不會旋轉。就目前而言,它會無限期地消耗穿越隊列的寶貴CPU資源,而從不收到它正在尋找的消息可能是完全正常的。

也許放棄LinkedBlockedQueue並將其換算爲具有方法的數據結構是有意義的,例如getEither(List<E> oneOfThese),該方法阻塞,直到其中一個元素可用。

我有一個其他想法,這是我可能用Java做的一種方式,如果隊列中沒有任何元素在隊列中,那麼調用wait()時會有上述getEither()操作。當其他線程在隊列中放入消息時,我可以調用notify(),以便每個線程都會根據他想要的消息列表檢查隊列。

下面的代碼工作正常。但是,它有紡紗問題。這基本上是我想要實現的一個非常基本的例子。

(def queue (ref '())) 

(defn contains-element [elements collection] 
    (some (zipmap elements (repeat true)) collection)) 

(defn has-element 
    [col e] 
    (some #(= e %) col)) 

(defn find-first 
     [f coll] 
     (first (filter f coll))) 

; This function is blocking, which is what I want. 
; However, it spins and thus used a LOT of cpu, 
; whit is *not* what I want.. 
(defn get-either 
    [getthese queue] 
    (dosync 
    (let [match (first (filter #(has-element getthese %) @queue)) 
      newlist (filter #(not= match %) @queue)] 

     (if (not (nil? match)) 
     (do (ref-set queue newlist) 
      match) 
     (Thread/sleep 500) 
     (recur))))) 

(defn somethread 
    [iwantthese] 
    (let [element (get-either iwantthese queue) 
     wanted (filter #(not= % element) iwantthese)] 
    (println (str "I got " element)) 
    (Thread/sleep 500) 
    (recur wanted))) 

(defn test 
    [] 
    (.start (Thread. (fn [] (somethread '(3 4 5))))) 

    (dosync (alter queue #(cons 1 %))) 
    (println "Main: added 1") 
    (Thread/sleep 1000) 

    (dosync (alter queue #(cons 2 %))) 
    (println "Main: added 2") 
    (Thread/sleep 1000) 

    (dosync (alter queue #(cons 3 %))) 
    (println "Main: added 3") 
    (Thread/sleep 1000) 

    (dosync (alter queue #(cons 4 %))) 
    (println "Main: added 4") 
    (Thread/sleep 1000) 

    (dosync (alter queue #(cons 5 %))) 
    (println "Main: added 5")   
) 

任何提示?

(萬一有人注意到,沒錯,這就是像演員和目的是Clojure中的學術目的的實現)

回答

1

您需要2個隊列而不是1個:傳入隊列和「死信」隊列。

  1. A 「線程」 應該從傳入隊列中的阻擋方式(LinkedBlockingQueue.take(),core.async/<!或使用藥劑)讀取。
  2. 如果消息不匹配任何條款:
    1. 運行子句工作:
      1. 放置消息的死隊列的末尾
      2. 轉到1
    2. 如果消息子句匹配
    3. 對於死亡隊列中的每條消息,匹配子句,刪除匹配的消息。
    4. 去1

見下面的兩種實現方法。

代理

代理是非常相似的演員,「唯一」的區別是,你發送的數據/信息演員,但你發送功能劑。一種可能的實現方式是:

(defn create-actor [behaviour] 
    (agent {:dead-queue [] 
      :behaviour behaviour})) 

dead-queue將包含與任何子句不匹配的消息。這基本上是你的「隊列結束」。 behaviour應該是一些match-fn的map/vector來運行。在我的具體實現中,我選擇了一個地圖,其中鍵元件匹配和值是fn到新的項目相匹配時運行:

(def actor (create-actor {3 println 
          4 (partial println "Got a ") 
          5 #(println "Got a " %)})) 

你可能會需要更復雜的behaviour數據結構。唯一重要的是要知道元素是否被處理,所以你知道元素是否必須去死隊列。

要發送消息給演員:

(defn push [actor message] 
    (send actor 
     (fn [state new-message] 
      (if-let [f (get-in state [:behaviour new-message])] 
      (do 
       (f new-message) 
       state) 
      (update-in state [:dead-queue] conj new-message))) 
     message)) 

所以,如果有對behaviour匹配,該消息被立即處理。如果不是,則存儲在死隊列中。如果您希望behaviours不是純函數,您可以在處理新消息後嘗試匹配/處理死隊列中的所有消息。在這個示例實現中,這是不可能的。

我們可以改變的演員behaviour給就死了隊列中的消息有機會進行處理:

(defn change-behaviour [actor behaviour] 
    (send actor 
     (fn [state new-behaviour] 
      (let [to-process (filter new-behaviour (:dead-queue state)) 
       new-dead-queue (vec (remove (set to-process) (:dead-queue state)))] 
      (doseq [old-message to-process 
        :let [f (get new-behaviour old-message)]] 
       (f old-message)) 
      {:behaviour new-behaviour 
      :dead-queue new-dead-queue})) 
     conds)) 

,並用它的一個例子:

(push actor 4) 
(push actor 18) 
(push actor 1) 
(push actor 18) 
(push actor 5) 
(change-behaviour actor {18 (partial println "There was an")}) 

而基於core.async的相同解決方案:

(defn create-actor [behaviour] 
    (let [queue (async/chan)] 
    (async/go-loop [dead-queue [] 
        behaviour behaviour] 
    (let [[type val] (async/<! queue)] 
     (if (= type :data) 
     (if-let [f (get behaviour val)] 
      (do 
      (f val) 
      (recur dead-queue behaviour)) 
      (recur (conj dead-queue val) behaviour)) 
     (let [to-process (filter val dead-queue) 
       new-dead-queue (vec (remove (set to-process) dead-queue))] 
      (doseq [old-msg to-process 
        :let [f (get val old-msg)]] 
      (f old-msg)) 
      (recur new-dead-queue val))))) 
    queue)) 

(defn push [actor message] 
    (async/go 
    (async/>! actor [:data message]))) 

(defn change-behaviour [actor behaviour] 
    (async/go 
    (async/>! actor [:behaviour behaviour]))) 
0

你有沒有考慮過使用core.async?它以輕量級的方式提供您所需要的。

+0

是的,我曾看過。但是我爲了研究目的在Clojure中實現了actor模型。所以我真的需要我自己的實現。快速查看從core.async傳遞的消息告訴我,不可能通過過濾器緩存消息並取出我想要的消息。例如,從所有消息中取出滿足謂詞p的第一條消息。 – 2014-10-01 20:59:17