4

試圖編寫一個消費卡夫卡消息的Spark Streaming作業。這裏是我到目前爲止:Spark Streaming中的卡夫卡消費者

1)開始動物園管理員。
2)啓動Kafka服務器。
3)發送幾條消息到服務器。我可以看到他們,當我運行以下命令:

斌/ kafka-console-consumer.sh --zookeeper本地主機:2181 --topic mytopic --from-開始

4)現在想寫一個程序來計算在5分鐘內進入的消息數量。

的代碼看起來是這樣的:

Map<String, Integer> map = new HashMap<String, Integer>(); 
    map.put("mytopic", new Integer(1)); 

    JavaStreamingContext ssc = new JavaStreamingContext(
      sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile}); 


    JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map); 

不知道什麼樣的價值要使用的第三個參數(消費羣)。當我運行這個時,我得到「無法連接到zookeeper服務器」。但Zookeeper運行在2181端口;否則步驟#3將無法工作。

好像我沒有使用正確KafkaUtils.createStream。有任何想法嗎?

+0

zookeeper是否與Spark一樣運行?您是否試圖通過使用 /current/bin/zkCli.sh連接到Zookeeper來驗證Zookeeper已啓動並運行? – 2014-11-04 00:59:41

+0

我好蠢!我改變了'localhost'到實際的機器名稱並且通過了這個錯誤。但 - 它還沒有工作。任何人都知道卡夫卡下的「消費羣體」的「默認」價值是什麼?它似乎沒有消耗任何消息。 – DilTeam 2014-11-04 06:35:52

+0

我面臨同樣的問題,我沒有收到生產者的任何消息。我正在使用Python生產者。而且我也可以從控制檯用戶那裏獲得msg。 numofparitions在我的配置中也是1。 @DilTeam你是如何解決這個問題的? – Knight71 2015-09-15 11:50:37

回答

2

沒有默認的消費者組。你可以在那裏使用任意的非空字符串。如果你只有一個消費者,它的消費者羣體並不重要。如果有兩個或更多的消費者,他們可以是同一消費羣體的一部分,或屬於不同的消費羣體。

http://kafka.apache.org/documentation.html

Consumers

...

If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.

If all the consumer instances have different consumer groups, then this works like publish-subscribe and all messages are broadcast to all consumers.

我覺得問題可能出在 '主題' 參數。 從Spark docs

Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread

你只指定了單個分區你的主題,即 '1'。根據代理的設置(num.partitions),可能會有多個分區,並且您的消息可能會發送到其他分區,而這些分區不會被程序讀取。

此外,我相信part​​itionIds是基於0的。所以如果你只有一個分區,它的ID將等於0.

+1

如您所建議的那樣,不確定分區ID是否爲0。當我使用: map.put(「mytopic」,new Integer(0)); 我得到這個錯誤: 錯誤ReceiverTracker:註銷接收器的流0:錯誤啓動接收器0 - java.lang.AssertionError:斷言失敗 – DilTeam 2014-11-05 00:12:07

+0

不應該跟隨代碼打印一些東西? JavaDStream 狀態= tweets.map( 新功能<字符串,字符串>(){ 公共字符串呼叫(字符串狀態){ 的System.out.println(狀態); 返回狀態; } } ); – DilTeam 2014-11-05 00:14:35

-2

我認爲,在你的代碼中,調用 KafkaUtils.createStream的第二個參數應該是kafka服務器的host:port,不是動物園管理員的主機和端口。檢查一次。

編輯: Kafka Utils API Documentation

按照上述文檔,它應該是動物園管理員仲裁。所以應該使用Zookeeper主機名和端口。

zkQuorum 動物園管理員仲裁(主機名:端口,主機名:端口,..)。

+0

如果我給主機:端口,則連接失敗。它只是動物園管理員的主機和端口。 – Knight71 2015-09-15 11:52:02

+0

它只是動物園管理員的名單,即zookeeperQorum – 2016-03-09 11:21:19

0

我想你應該指定的動物園管理員,而不是本地主機的IP地址。另外,第三個參數是針對消費者組的名稱。它可以是任何你喜歡的名字。當你有多個消費者與同一組綁定時,主題分區會相應地分配。您的推文應該是:

JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "x.x.x.x", "dummy-group", map); 
0

我正面臨同樣的問題。這是爲我工作的解決方案。

  • 分配給Spark Streaming應用程序的內核數量必須多於接收器的數量。否則系統會收到數據,但無法處理它。所以Spark Streaming需要至少兩個內核。所以在我的火花提交中,我應該提到至少兩個核心。
  • kafka-clients-version.jar應該包含在spark-submit中相關jar的列表中。
0

如果zookeeper與流應用程序在同一臺計算機上運行,​​那麼「localhost:2181」將起作用。否則,您必須提及zookeeper正在運行的主機的地址,並確保運行流應用程序的計算機能夠與端口2181上的zookeeper主機通信。

+0

不要發表評論爲答案。這應該是評論 – ketan 2016-05-25 06:12:51