我正在實現一個線程的機制,讓它有一個包含消息的隊列。該隊列使用java.util.concurrent
的LinkedBlockingQueue
構建。我想要達到的是如下所示。Clojure等待沒有旋轉的條件
Thread with mailbox:
defn work:
* do some stuff
* Get the head of the queue (a message):
- if it is "hello":
<do some stuff>
<recur work fn>
- if it is "bye":
<do some stuff>
- if it is none of the above, add the message to the back of queue
and restart from "Get the head of the queue"
* <reaching this point implies terminating the thread>
,我試圖用的是* Get the head of the queue
纏循環,使用條件檢查郵件,並在:else
分支添加到隊列中,如果沒有匹配的條款來實現我最初的想法。其缺點是在cond
的條款的任何主體中呼叫recur
將總是重複該循環,而使用recur
(例如,如在hello
的情況下)意味着重複該功能(即,work
)。所以這不是一個選擇。另一個缺點是,如果這種信息需要很長時間才能到達,線程將無限期地旋轉並吃掉資源。
我有(但尚未實施)的下一個想法是使用未來。該計劃如下。
* Get all the matches I have to match (i.e., "hello" and "bye")
* Start a future and pass it the list of messages:
* While the queue does not contain any of the messages
recur
* when found, return the first element that matches.
* Wait for the future to deliver.
* if it is "hello":
<do some stuff>
<recur work fn>
if it is "bye":
<do some stuff>
在做這種方式,我得到幾乎我想要的東西:
- 在接受
"hello"
或"bye"
塊,直到我有一張。 - 我可以讓條款無限期數量相匹配的消息
- 我已經提取的循環行爲成
future
阻止,這 有很好的副作用,每次我評價我cond
我敢肯定我有一個匹配的消息,不必擔心重試。
我真正想要的一件事,但無法想象如何實現,就是在這種情況下的未來不會旋轉。就目前而言,它會無限期地消耗穿越隊列的寶貴CPU資源,而從不收到它正在尋找的消息可能是完全正常的。
也許放棄LinkedBlockedQueue
並將其換算爲具有方法的數據結構是有意義的,例如getEither(List<E> oneOfThese)
,該方法阻塞,直到其中一個元素可用。
我有一個其他想法,這是我可能用Java做的一種方式,如果隊列中沒有任何元素在隊列中,那麼調用wait()
時會有上述getEither()
操作。當其他線程在隊列中放入消息時,我可以調用notify()
,以便每個線程都會根據他想要的消息列表檢查隊列。
例
下面的代碼工作正常。但是,它有紡紗問題。這基本上是我想要實現的一個非常基本的例子。
(def queue (ref '()))
(defn contains-element [elements collection]
(some (zipmap elements (repeat true)) collection))
(defn has-element
[col e]
(some #(= e %) col))
(defn find-first
[f coll]
(first (filter f coll)))
; This function is blocking, which is what I want.
; However, it spins and thus used a LOT of cpu,
; whit is *not* what I want..
(defn get-either
[getthese queue]
(dosync
(let [match (first (filter #(has-element getthese %) @queue))
newlist (filter #(not= match %) @queue)]
(if (not (nil? match))
(do (ref-set queue newlist)
match)
(Thread/sleep 500)
(recur)))))
(defn somethread
[iwantthese]
(let [element (get-either iwantthese queue)
wanted (filter #(not= % element) iwantthese)]
(println (str "I got " element))
(Thread/sleep 500)
(recur wanted)))
(defn test
[]
(.start (Thread. (fn [] (somethread '(3 4 5)))))
(dosync (alter queue #(cons 1 %)))
(println "Main: added 1")
(Thread/sleep 1000)
(dosync (alter queue #(cons 2 %)))
(println "Main: added 2")
(Thread/sleep 1000)
(dosync (alter queue #(cons 3 %)))
(println "Main: added 3")
(Thread/sleep 1000)
(dosync (alter queue #(cons 4 %)))
(println "Main: added 4")
(Thread/sleep 1000)
(dosync (alter queue #(cons 5 %)))
(println "Main: added 5")
)
任何提示?
(萬一有人注意到,沒錯,這就是像演員和目的是Clojure中的學術目的的實現)
是的,我曾看過。但是我爲了研究目的在Clojure中實現了actor模型。所以我真的需要我自己的實現。快速查看從core.async傳遞的消息告訴我,不可能通過過濾器緩存消息並取出我想要的消息。例如,從所有消息中取出滿足謂詞p的第一條消息。 – 2014-10-01 20:59:17