2016-05-19 46 views
3

我正在嘗試使用clojure core.async通道來節制內存密集型併發進程。每個進程都會將圖像加載到內存中並應用水印。如果我嘗試同時處理太多圖像,則會出現OOM錯誤。使用Clojure的節流進程core.async

下面的模式似乎工作,但它感覺有點不雅。我的問題是,有沒有更好的方法來做到這一點與core.async?或者,我應該只使用java併發的東西來做這件事(即創建一個固定大小的線程池等)。

在下面的代碼的基本概念是使用全球固定尺寸信道,tchan其用於節流什麼進入in-chan,基本上限制併發進程的到的tchan大小的數目。

在下面的代碼中,process-images是入口點。

(def tbuff (buffer 20)) 

(def tchan 
    "tchan is used to throttle the number of processes 
    tbuff is a fixed size buffer" 
    (chan tbuff)) 

(defn accum-results 
    "Accumulates the images in results-chan" 
    [n result-chan] 
    (let [chans [result-chan (timeout timeout-ms)]] 
    (loop [imgs-out [] 
      remaining n] 
     (if (zero? remaining) 
     imgs-out 
     (let [[img-result _] (alts!! chans)] 
      (if (nil? img-result) 
      (do 
       (log/warn "Image processing timed out") 
       (go (dotimes [_ remaining] (<! tchan))) 
       imgs-out) 
      (do 
       (go (<! tchan)) 
       (recur (conj imgs-out img-result) (dec remaining))))))))) 

(defn process-images 
    "Concurrently watermarks a list of images 
    Images is a sequence of maps representing image info 
    Concurrently fetches each actual image and applies the watermark 
    Returns a map of image info map -> image input stream" 
    [images] 
    (let [num-imgs (count images) 
     in-chan (chan num-imgs) 
     out-chan (chan num-imgs)] 
    ;; set up the image-map consumer 
    ;; asynchronously process things found on in-chan 
    (go 
     (dotimes [_ num-imgs] 
     ; block here on input images 
     (let [img-in (<! in-chan)] 
      (thread 
      (let [img-out (watermark/watermarked-image-is img-in)] 
       (>!! out-chan [img-in img-out])))))) 
    ;; put images on in-chan 
    (go 
     (doseq [img images] 
     (>! tchan :x) 
     (>! in-chan img))) 
    ;; accum results 
    (let [results (accum-results num-imgs out-chan)] 
     (log/info (format "Processed %s of %s images and tbuff is %s" 
         (count results) num-imgs (count tbuff))) 
     (into {} results)))) 

回答

2

我相信這正是pipeline的用途。

而且這裏有一個例子:

user> (require '[clojure.core.async :refer [<! <!! chan go go-loop pipeline pipeline-blocking pipeline-async] :as async]) 

user> (let [output (chan) 
      input (async/to-chan (range 10))] 
     (go-loop [x (<! output)] 
      (println x)) 
     (pipeline 4 
        output 
        (map #(do 
          (Thread/sleep (rand-int 200)) 
          (println "starting" %) 
          (Thread/sleep 1000) 
          (println "finished" %) 
          (inc %))) 
        input)) 
#object[clojure.core.async.impl.channels.ManyToManyChannel 0x3f434b5a "[email protected]"] 
user> starting 0 
starting 3 
starting 1 
starting 2 
finished 0 
1 
finished 3 
finished 1 
finished 2 
starting 4 
starting 5 
starting 6 
finished 4 
finished 5 
finished 6 
+0

在我編輯在這裏的例子,這是值得注意的是,映射的調用是一個轉換器,而不是一個懶惰的序列,它只是看起來像一個。 –

相關問題