0
我正在使用clj-kafka
,我試圖在REPL中爲它創建一個core.async
接口。Clojure - core.async接口爲apache kafka
我收到一些消息,但我的結構感覺不對:我無法停止接收消息,或者必須再次啓動go
例程以接收更多消息。
這裏是我的嘗試:
(defn consume [topic]
(let [consume-chan (chan)]
(with-resource [c (consumer config)]
shutdown
(go (doseq [m (messages c "test")]
(>! chan message) ;; should I check the return value?
)))
consume-chan)) ;; is it the right place to return a channel ?
(def consume-chan (consume "test"))
;;(close! consume-chan)
(go (>! consume-chan "hi")) ;; manual test, but I have some messages in Kafka already
(def cons-ch (go
(with-resource [c (consumer config)]
shutdown
(doseq [m (messages c "test")]
(>! consume-chan m))))) ;; should I check something here ?
;;(close! cons-ch)
(def go-ch
(go-loop []
(if-let [km (<! consume-chan)]
(do (println "Got a value in this loop:" km)
(recur))
(do (println "Stop recurring - channel closed")))))
;;(close! go-ch)
如何使用消息的一個core.async
接口一個懶惰的序列?