2015-09-14 84 views
0

我正在使用clj-kafka,我試圖在REPL中爲它創建一個core.async接口。Clojure - core.async接口爲apache kafka

我收到一些消息,但我的結構感覺不對:我無法停止接收消息,或者必須再次啓動go例程以接收更多消息。

這裏是我的嘗試:

(defn consume [topic] 
    (let [consume-chan (chan)] 
    (with-resource [c (consumer config)] 
     shutdown 
     (go (doseq [m (messages c "test")] 
        (>! chan message) ;; should I check the return value? 
        ))) 
    consume-chan)) ;; is it the right place to return a channel ? 


    (def consume-chan (consume "test")) 
    ;;(close! consume-chan) 

    (go (>! consume-chan "hi")) ;; manual test, but I have some messages in Kafka already 

    (def cons-ch (go 
       (with-resource [c (consumer config)] 
        shutdown 
        (doseq [m (messages c "test")] 
        (>! consume-chan m))))) ;; should I check something here ? 

    ;;(close! cons-ch) 

    (def go-ch 
    (go-loop [] 
     (if-let [km (<! consume-chan)] 
     (do (println "Got a value in this loop:" km) 
       (recur)) 
     (do (println "Stop recurring - channel closed"))))) 

    ;;(close! go-ch) 

如何使用消息的一個core.async接口一個懶惰的序列?

回答

0

這裏是我會做什麼:

  • >!<!回報爲零,如果該通道是封閉的,所以要確保退出循環當這種情況發生 - 這樣你可以很容易地從外部結束循環通過關閉頻道。

  • 使用try/catch檢查go塊內的異常,並將任何異常作爲返回值使其不會丟失。

  • 檢查讀取值的例外情況,以捕獲通道內的任何內容。

  • go塊返回一個通道,塊內代碼的返回值(比如上面的例外)將放在通道上。檢查這些渠道的例外情況,可能會重新拋出。

現在,您可以寫這樣的信道:

(defn write-seq-to-channel 
    [channel 
    values-seq] 
    (a/go 
    (try 
     (loop [values values-seq] 
     (when (seq values) 
      (when (a/>! channel (first values)) 
      (recur (rest values))))) 
     (catch Throwable e 
     e)))) 

,你這樣寫的:

(defn read-from-channel-and-print 
    [channel] 
    (a/go 
    (try 
     (loop [] 
     (let [value (a/<! channel)] 
      (when value 
      (when (instance? Throwable value) 
       (throw value)) 
      (println "Value read:" value) 
      (recur)))) 
     (catch Throwable e 
     e)))) 

現在,您將有兩個通道,所以使用類似alts!alts!!檢查您的循環退出。完成後關閉頻道。