1
以下代碼只是阻止執行。看來由於某種原因,地圖無法返回延遲序列。我不知道爲什麼。在Clojure中使用Kafka迭代的正確方法是什麼?
(ns testt.consumer
(:require
;internal
;external
[clojure.walk :refer [stringify-keys]]
[clojure.pprint :as pprint])
(:import
[kafka.consumer ConsumerConfig Consumer KafkaStream ]
[kafka.javaapi.consumer ConsumerConnector ]
[kafka.message MessageAndMetadata ]
[java.util Properties ])
(:gen-class))
; internal
(defn hashmap-to-properties
[h]
(doto (Properties.)
(.putAll (stringify-keys h))))
; external
(defn consumer-connector
[h]
(let [config (ConsumerConfig. (hashmap-to-properties h))]
(Consumer/createJavaConsumerConnector config)))
(defn message-stream
[^ConsumerConnector consumer topic thread-pool-size]
;this is dealing only with the first stream, needs to be fixed to support multiple streams
(let [ stream (first (.get (.createMessageStreams consumer {topic thread-pool-size}) topic)) ]
(map #(.message %) (iterate (.next (.iterator ^KafkaStream stream))))))
構造的連接器預計:
{ :zookeeper.connect "10.0.0.1:2181"
:group.id "test-0"
:thread.pool.size "1"
:topic "test_topic"
:zookeeper.session.timeout.ms "1000"
:zookeeper.sync.time.ms "200"
:auto.commit.interval.ms "1000"
:auto.offset.reset "smallest"
:auto.commit.enable "true" }