2016-04-01 76 views
4

我正在編寫客戶端以使用kafka 0.9。我想知道如何創建一個主題。這個答案:How to create a Topic in Kafka through Java與我所要求的相似。除此之外,該解決方案僅適用於與Kafka 0.9的API截然不同的Kafka 0.8.2。爲Apache Kafka創建主題0.9使用Java

+0

換句話說,您嘗試過類似的解決方案,它不起作用,是嗎?請描述你嘗試過的以及遇到的問題。 –

+0

我正在爲一家公司工作。它運行卡夫卡0.8.2。我基於鏈接實現了它。現在該公司希望升級到卡夫卡0.9。我需要一個快速解決方案將我的代碼升級到0.9。 –

回答

8

在網上查看scala api和各種鏈接之後。

這是我找到了解決辦法:

Maven依賴:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.9.0.0</version> 
</dependency> 
<dependency> 
    <groupId>com.101tec</groupId> 
    <artifactId>zkclient</artifactId> 
    <version>0.7</version> 
</dependency> 

代碼:

import java.util.Properties; 

import org.I0Itec.zkclient.ZkClient; 
import org.I0Itec.zkclient.ZkConnection; 

import kafka.admin.AdminUtils; 
import kafka.utils.ZKStringSerializer$; 
import kafka.utils.ZkUtils; 

public class KafkaJavaExample { 

    public static void main(String[] args) { 
     String zookeeperConnect = "zkserver1:2181,zkserver2:2181"; 
     int sessionTimeoutMs = 10 * 1000; 
     int connectionTimeoutMs = 8 * 1000; 

     ZkClient zkClient = new ZkClient(
      zookeeperConnect, 
      sessionTimeoutMs, 
      connectionTimeoutMs, 
      ZKStringSerializer$.MODULE$); 

     // Security for Kafka was added in Kafka 0.9.0.0 
     boolean isSecureKafkaCluster = false; 
     // ZkUtils for Kafka was used in Kafka 0.9.0.0 for the AdminUtils API 
     ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster); 

     String topic = "my-topic"; 
     int partitions = 2; 
     int replication = 3; 

     // Add topic configuration here 
     Properties topicConfig = new Properties(); 

     AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig); 
     zkClient.close(); 
    } 
} 

如果你想知道爲什麼下面的代碼看起來並不像Java:

ZKStringSerializer$.MODULE$ 

這是因爲ZkStringSerializer是一個Scala對象。 你可以閱讀這裏的更多信息:

How create Kafka ZKStringSerializer in Java?

注意:必須以ZKStringSerializer初始化ZkClient。
如果你不這樣做,那麼createTopic()似乎只會工作 (換句話說:它會返回沒有錯誤)。
該主題將只存在於Zookeeper中,僅在列出主題時纔有效。下面 即list命令工作正常

bin/kafka-topics.sh --list --zookeeper localhost:2181 

但卡夫卡本身並不創造話題。 爲了說明,下面的描述命令會引發錯誤。

bin/kafka-topics.sh --describe --zookeeper localhost:2181 

因此,請確保使用ZKStringSerializer $ .MODULE $進行初始化。

參考文獻: How Can we create a topic in Kafka from the IDE using API來自該IDE-使用的API

不久慈龍,多倫多

大學
+1

原始鏈接參考是[http://stackoverflow.com/questions/16946778/how-can-we-create-a-topic-in-kafka-from-the-ide-using-api](http:// stackoverflow.com/questions/16946778/how-can-we-create-a-topic-in-kafka-from-the-ide-using-api)不是http://www.askdaima.com/question/0e3b996eda49e3e4 –

+0

@ JayaAnanthram 固定,謝謝。我不知道原始鏈接參考。 –

8

我想不久之後慈龍的回答與卡夫卡0.9.0.1但不得不做一個改變。 ZKStringSerializer現在是私有的。要創建ZkUtils,我使用了以下API(它在內部創建一個ZkClient):

ZkUtils.apply(
    "zookeeper1:port1,zookeeper2:port2", 
    sessionTimeoutMs, 
    connectionTimeoutMs, 
    false) 
+0

感謝您的支持,它也與Kafka 0.10.0.1合作 – Prashant