我有兩個不同的函數需要處理兩個非常大的數據集,最後到兩個布爾值。這些價值觀需要共同作出最終結果。我的問題是創建線程的最佳方式是什麼,以便兩個長功能可以同時運行。我的想法是這樣的,與Clojure線程長時間運行的進程並比較它們的返回
(def f (future longProcessOne(data_one)))
(def g (future longProcessTwo(data_two)))
(and @f @g)
但我正在尋找更好的方式來進行這方面的輸入。
我有兩個不同的函數需要處理兩個非常大的數據集,最後到兩個布爾值。這些價值觀需要共同作出最終結果。我的問題是創建線程的最佳方式是什麼,以便兩個長功能可以同時運行。我的想法是這樣的,與Clojure線程長時間運行的進程並比較它們的返回
(def f (future longProcessOne(data_one)))
(def g (future longProcessTwo(data_two)))
(and @f @g)
但我正在尋找更好的方式來進行這方面的輸入。
以下是一個版本,利用單個承諾可以多次交付的事實(儘管只有第一次交付才能成功設定其價值;後續交付只需返回nil
,無副作用)。
(defn thread-and
"Computes logical conjunction of return values of fs, each of which
is called in a future. Short-circuits (cancelling the remaining
futures) on first falsey value."
[& fs]
(let [done (promise)
ret (atom true)
fps (promise)]
(deliver fps (doall (for [f fs]
(let [p (promise)]
[(future
(if-not (swap! ret #(and %1 %2) (f))
(deliver done true))
(locking fps
(deliver p true)
(when (every? realized? (map peek @fps))
(deliver done true))))
p]))))
@done
(doseq [[fut] @fps]
(future-cancel fut))
@ret))
一些測試:
(thread-and (constantly true) (constantly true))
;;= true
(thread-and (constantly true) (constantly false))
;;= false
(every? false?
(repeatedly 100000
#(thread-and (constantly true) (constantly false))))
;;= true
;; prints :foo, but not :bar
(thread-and #(do (Thread/sleep 1000) (println :foo))
#(do (Thread/sleep 3000) (println :bar)))
把亞瑟和A.韋伯的觀點放在一起,你可以使用core.async並在一起的結果,而短路第一falsey返回值:
(defn thread-and
"Call each of the fs on a separate thread. Return logical
conjunction of the results. Short-circuit (and cancel the calls
to remaining fs) on first falsey value returned."
[& fs]
(let [futs-and-cs
(doall (for [f fs]
(let [c (chan)]
[(future (>!! c (f))) c])))]
(loop [futs-and-cs futs-and-cs]
(if (seq futs-and-cs)
(let [[result c] (alts!! (map peek futs-and-cs))]
(if result
(recur (remove #(identical? (peek %) c)
futs-and-cs))
(do (doseq [fut (map first futs-and-cs)]
(future-cancel fut))
false)))
true))))
測試與(constantly false)
和(constantly true)
:
(thread-and (constantly true) (constantly true))
;= true
(thread-and (constantly true) (constantly false))
;= false
;;; etc.
還要注意,短路確實做的工作:
;;; prints :foo before returning false
(thread-and #(do (Thread/sleep 3000) false)
#(do (Thread/sleep 1000) (println :foo)))
;;; does not print :foo
(thread-and #(do (Thread/sleep 3000) false)
#(do (Thread/sleep 7000) (println :foo)))
也可在此[主要](https://gist.github.com/michalmarczyk/5988137)。 –
有沒有可能做到這一點沒有core.async? –
@event_jr:實際上,它只是編輯了答案,包含了基於承諾的方法。也可以在這[主要](https://gist.github.com/michalmarczyk/5991353)。 –
你的方法是相當正常的Clojure代碼。另一種選擇是使用的承諾,或者如果您需要更復雜的處理,你可以考慮使用類似lamina或者如果你覺得自己生活在最前沿you could try core.async:
(ns async-example.core
(:require [clojure.core.async :refer :all])
(defn example []
(let [a (chan) ; a channel for a to report it's answer
b (chan) ; a channel for b to report it's answer
output (chan)] ; a channel for the reporter to report back to the repl
(go (<! (timeout (rand-int 1000))) ; process a
(>! a (rand-nth [true false])))
(go (<! (timeout (rand-int 1000))) ; process b
(>! b (rand-nth [true false])))
(go (>! output (and (<! a) (<! b)))) ; the reporter process
output)) ;return the channe that the result will be sent to
async-example.core> (<!! (go (<! (example))))
false
async-example.core> (<!! (go (<! (example))))
false
async-example.core> (<!! (go (<! (example))))
true
當然,這是矯枉過正您的情況,但這是極大的樂趣反正;-)
在core.async代碼中使用'Thread/sleep'有點邪惡 - 它正在吞噬睡眠的線程池。原生的core.async方法是使用'core.async/timeout',這會在整個過程中停止塊的狀態。 –
是的,這是絕對正確的,我改變了我的代碼使用超時。感謝您指出, –
難道這種做法行不通?你有特別的問題嗎?這對我來說似乎很不錯:) –
雖然它的工作,它不得不完成,如果一個失敗,沒有短路。我正在尋找更快的方式來做同樣的事情。 – ChadJPetersen
@ChadJPetersen不,它不必完成; '和'短路。它不會取消它不使用的那個,但這是一個單獨的問題(如果你願意,你可以取消它)。 –