2014-05-15 65 views
1

以下代碼只是阻止執行。看來由於某種原因,地圖無法返回延遲序列。我不知道爲什麼。在Clojure中使用Kafka迭代的正確方法是什麼?

(ns testt.consumer 
    (:require 
    ;internal 
    ;external 
    [clojure.walk :refer [stringify-keys]] 
    [clojure.pprint :as pprint]) 
    (:import 
    [kafka.consumer   ConsumerConfig Consumer KafkaStream ] 
    [kafka.javaapi.consumer ConsumerConnector     ] 
    [kafka.message   MessageAndMetadata     ] 
    [java.util    Properties       ]) 
    (:gen-class)) 

; internal 

(defn hashmap-to-properties 
    [h] 
    (doto (Properties.) 
    (.putAll (stringify-keys h)))) 

; external 

(defn consumer-connector 
    [h] 
    (let [config (ConsumerConfig. (hashmap-to-properties h))] 
    (Consumer/createJavaConsumerConnector config))) 

(defn message-stream 
    [^ConsumerConnector consumer topic thread-pool-size] 
    ;this is dealing only with the first stream, needs to be fixed to support multiple streams 
    (let [ stream (first (.get (.createMessageStreams consumer {topic thread-pool-size}) topic)) ] 
    (map #(.message %) (iterate (.next (.iterator ^KafkaStream stream)))))) 

構造的連接器預計:

{ :zookeeper.connect    "10.0.0.1:2181" 
    :group.id      "test-0" 
    :thread.pool.size    "1" 
    :topic       "test_topic" 
    :zookeeper.session.timeout.ms "1000" 
    :zookeeper.sync.time.ms   "200" 
    :auto.commit.interval.ms  "1000" 
    :auto.offset.reset    "smallest" 
    :auto.commit.enable    "true" } 

回答

2

其實沒有必要對卡夫卡的迭代,因爲Clojure中提供了處理流的更好的方法。您可以將卡夫卡流視爲一個序列,只需執行以下操作:

(doseq 
[^kafka.message.MessageAndMetadata message stream] (do-some-stuff message)) 

這可能是最有效的方法。

更多的代碼是在這裏:

https://github.com/l1x/shovel

相關問題