2016-12-25 105 views
1

我試圖使用kafka的kafka.zk.EmbeddedZookeeperkafka.server.KafkaServerkafka.utils.TestUtils/createServer返回,以運行卡夫卡服務器用於測試目的。未能生成嵌入的卡夫卡經紀人

但是我在試圖發送消息超時時遇到了障礙,並且返回KafkaProducer$Future失敗。以下是我正在使用的kafka版本。下面的代碼是Clojure與Kafka庫的互操作。

[org.apache.kafka/kafka_2.11 "0.10.0.1"] 
[org.apache.kafka/kafka-clients "0.10.1.0"] 

這是我得到多遠。

  • 動物園管理員端口被隨機分配(見here)。
  • 可以使用netcat成功創建一個Zookeeper server並連接到它。
  • 可以成功創建主題。
  • 可以成功創建一個Kafka代理並使用netcat連接到它。
  • 步驟5是過程失敗的地方。

這個SO question表明傳遞正確的Time對象是很重要的。但MockTime看起來像一個合理的實施。任何人都解決過這個問題?

;; 1. Create Zookeeper 
(require '[clojure.test :refer :all] 
    '[kafkaesque.topics :as kt] 
    '[kafkaesque.utils :as ku] 
    '[clojure.pprint :refer [pprint]]) 

(import '[java.nio.file Files] 
    '[kafka.zk EmbeddedZookeeper] 
    '[kafka.server KafkaServer KafkaConfig] 
    '[kafka.utils TestUtils Time MockTime]) 

(def zk-config {:zkhost "127.0.0.1"}) 
(def topic-name "client-test") 
(def ^EmbeddedZookeeper zkServer (EmbeddedZookeeper.)) 


;; 2. Create Kafka Broker 
(def zk-connect-str (str "127.0.0.1" ":" (.port zkServer))) 
(def zku ((ZkUtils/apply (ZkUtils/createZkClient zk-connect-str 10000 8000) false))) 
(def brokerhost "127.0.0.1") 
(def brokerport "9092") 

(def ^KafkaConfig config (KafkaConfig. {"zookeeper.connect" zk-connect-str 
         "broker.id" "0" 
         "log.dirs" (.toString 
          (.toAbsolutePath 
          (Files/createTempDirectory 
          "kafka-" (make-array java.nio.file.attribute.FileAttribute 0)))) 
         "listeners" (str "PLAINTEXT://" brokerhost ":" brokerport)})) 

(def ^Time mock (MockTime.)) 
(def ^KafkaServer kafkaServer (TestUtils/createServer config mock)) 


;; 3. Create a Topic 
(kt/create! zku topic-name 1 1 {}) 
(kt/topic-exists? zku topic-name) ;; returns true 


;; 4. Create a Producer and ProducerRecord 
(def producer-a (kc/producer {"bootstrap.servers" "127.0.0.1:9092" 
       "acks"    "all" 
       "retries"   "0" 
       "batch.size"  "16384" 
       "linger.ms"   "1" 
       "buffer.memory"  "33554432" 
       "key.serializer" "org.apache.kafka.common.serialization.StringSerializer" 
       "value.serializer" "org.apache.kafka.common.serialization.StringSerializer"})) 

(def message-key "k1") 
(def message-value "foobar") 
(def record-a (kc/producer-record topic-name 0 message-key message-value)) 


;; 5. Send a message 
(def send-result (kc/send! producer-a record-a)) ;; Times out, and returns a KafkaProducer$Future failure. 
+0

可以不指定TestUtils.createServer的Time實例,因爲它會創建一個默認的實例。至於'超時',你能否確認它是說讓元數據超時?此外,我注意到客戶端和服務器的版本不匹配。你可以使用同一版本的Kafka重試嗎? – amethystic

+0

@amethystic Crikey,就是這樣 - 版本不匹配。 * org.apache.kafka/kafka_2.11 * **「0.10.0.1」**,而不是* org.apache.kafka/kafka-clients * **「0.10.1.0」**。天哪,我正在拔頭髮!我將所有版本移至**「0.10.1.0」**,並且一切正常。乾杯:) – Nutritioustim

回答

0

感謝去@amethystic指出我的閱讀障礙:)問題是版本不匹配。我用org.apache.kafka/kafka_2.11「0.10.0.1」,與org.apache.kafka /卡夫卡客戶「0.10.1.0」

我將所有版本移動到「0.10.1.0」,並且所有內容都按預期工作。

希望這會有所幫助。

相關問題