2016-11-04 19 views
3

我想弄清楚如何最好地創建一個異步組件,或以一種組件友好的方式適應異步代碼。這是我能想到的最好的,而且......它感覺不太正確。如何改進此Clojure組件+異步示例?

要義:接話,uppercase他們和他們reverse最後print他們。

問題1:我不能讓system停止在最後。我期望看到個人c-chan停止的println,但不。

問題2:我該如何正確注射貼劑。進入producer/consumer fns?我的意思是,它們不是組件,我認爲它們應該是而不是是組件,因爲它們沒有明智的生命週期。

問題3:如何處理慣用命名爲a>b,並b>casync/pipeline -creating副作用? pipeline應該是一個組件嗎?

(ns pipelines.core 
    (:require [clojure.core.async :as async 
      :refer [go >! <! chan pipeline-blocking close!]] 
      [com.stuartsierra.component :as component])) 


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; 
;; PIPELINES 
(defn a>b [a> b>] 
    (pipeline-blocking 4 
        b> 
        (map clojure.string/upper-case) 
        a>)) 
(defn b>c [b> c>] 
    (pipeline-blocking 4 
        c> 
        (map (comp (partial apply str) 
           reverse)) 
        b>)) 


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; 
;; PRODUCER/CONSUMER 
(defn producer [a>] 
    (doseq [word ["apple" "banana" "carrot"]] 
    (go (>! a> word)))) 

(defn consumer [c>] 
    (go (while true 
     (println "Your Word Is: " (<! c>))))) 



;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; 
;; SYSTEM 
(defn pipeline-system [config-options] 
    (let [c-chan (reify component/Lifecycle 
       (start [this] 
        (println "starting chan: " this) 
        (chan 1)) 
       (stop [this] 
        (println "stopping chan: " this) 
        (close! this)))] 
    (-> (component/system-map 
     :a> c-chan 
     :b> c-chan 
     :c> c-chan) 
     (component/using {})))) 


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; 
;; RUN IT! 
(def system (atom nil)) 
(let [_  (reset! system (component/start (pipeline-system {}))) 
     _  (a>b (:a> @system) (:b> @system)) 
     _  (b>c (:b> @system) (:c> @system)) 
     _  (producer (:a> @system)) 
     _  (consumer (:c> @system)) 
     _  (component/stop @system)]) 

編輯

我開始思考以下,但我不能肯定它是否正常關閉...

(extend-protocol component/Lifecycle 
    clojure.core.async.impl.channels.ManyToManyChannel 
    (start [this] 
    this) 
    (stop [this] 
    (close! this))) 

回答

7

我重寫你的榜樣有點讓它可加載

增值的管道

(ns pipeline 
    (:require [clojure.core.async :as ca :refer [>! <!]] 
      [clojure.string :as s])) 

(defn upverse [from to] 
    (ca/pipeline-blocking 4 
         to 
         (map (comp s/upper-case 
            s/reverse)) 
         from)) 
(defn produce [ch xs] 
    (doseq [word xs] 
    (ca/go (>! ch word)))) 

(defn consume [ch] 
    (ca/go-loop [] 
       (when-let [word (<! ch)] 
       (println "your word is:" word) 
       (recur)))) 

(defn start-engine [] 
    (let [[from to] [(ca/chan) (ca/chan)]] 
    (upverse to from) 
    (consume from) 
    {:stop (fn [] 
      (ca/close! to) 
      (ca/close! from) 
      (println "engine is stopped")) 
    :process (partial produce to)})) 

這樣你可以做(start-engine),並用它來處理單詞序列:

REPL時間

boot.user=> (require '[pipeline]) 

boot.user=> (def engine (pipeline/start-engine)) 
#'boot.user/engine 

它運行

boot.user=> ((engine :process) ["apple" "banana" "carrot"]) 

your word is: TORRAC 
your word is: ANANAB 
your word is: ELPPA 

boot.user=> ((engine :process) ["do" "what" "makes" "sense"]) 

your word is: OD 
your word is: SEKAM 
your word is: ESNES 
your word is: TAHW 

停止它

boot.user=> ((:stop engine)) 
engine is stopped 

;; engine would not process anymore 
boot.user=> ((engine :process) ["apple" "banana" "carrot"]) 
nil 

國家管理

取決於你打算如何使用這條管道,可能不會在所有需要像組件的狀態管理框架:沒有必要「以防萬一」添加任何東西,啓動和停止在這種情況下管道是一個調用兩個函數的問題。

但是,如果此管道在具有更多狀態的較大應用程序中使用,您肯定可以從狀態管理庫中受益。

not a fan of Component主要是因爲它需要一個完整的應用程序買入(這使它成爲一個框架),但我不使用它尊重其他人。

安裝

我建議要麼沒有的情況下,使用任何具體的程序是小:你,例如可以構成此管道與其他管道/邏輯和-main踢它關閉,但如果應用程序是任何更大,有更多的國家無關,這裏是所有你需要做的,添加mount它:

(defstate engine :start (start-engine) 
       :stop ((:stop engine))) 

開始管道

boot.user=> (mount/start) 
{:started ["#'pipeline/engine"]} 

boot.user=> ((engine :process) ["do" "what" "makes" "sense"]) 

your word is: OD 
your word is: SEKAM 
your word is: ESNES 
your word is: TAHW 

運行停止它

boot.user=> (mount/stop) 
engine is stopped 
{:stopped ["#'pipeline/engine"]} 

這裏是一個gist with a full example包括build.boot

你可以下載,並通過boot repl


[編輯]用它玩:回答您已經迷上了組件的評論

在情況下,這應該讓你開始:

(defrecord WordEngine [] 
    component/Lifecycle 

    (start [component] 
    (merge component (start-engine))) 

    (stop [component] 
    ((:stop component)) 
    (assoc component :process nil :stop nil))) 

這在開始時會創建一個WordEngine對象,該對象將具有方法:processod

您將無法像調用普通的Clojure函數那樣調用它:即從REPL或任何名稱空間僅由:require開始,除非您將引用傳遞給不推薦的整個系統。

所以爲了調用它,需要將WordEngine插入到一個Component系統中,然後注入到另一個Component中,然後可以解構:process函數並調用它。

+0

Mount + Async的一個很好的例子,就像你說的那樣,對於較小的用例可能是首選。在爲* this *作品安排Component之前,我實際上花了很多時間在Component *和* Mount上,也就是因爲我喜歡「框架」提供的內容。對我來說,這是一個已經「買進」Component的大型系統,我真的很喜歡爲組件運行一個例子,儘管我會因爲這是一個很好的Mount示例而聽到這個聲音! (加上,我喜歡這是'引導',我打算最終切換:) –

+1

我的意思是更小的情況是什麼都沒用:只是Clojure。 Mount目前在很多大型應用程序中都非常成功地使用。事實上,一週前我剛剛聽到一家公司將他們的20K LOC應用程序從Component轉換爲Mount,並且不能更快樂。我在答案中添加了一個示例組件示例。 – tolitius

+1

我非常感謝你們在提倡和支持'Mount'!與我對斯圖爾特塞拉利昂和weavejester(詹姆斯?)對組件的看法,我懷疑他們對這個玩具問題的處理看起來更通用一些,比如'channel'或'pipeline'組件(帶有'boundary's?),而不是'WordEngine',以及補救你徵收的一些(否則有效的)批評。儘管如此,我們確實有權衡取捨,並且你說服我讓「更多」機會,所以非常感謝!我很感激tolitius! –