試圖編寫一個消費卡夫卡消息的Spark Streaming作業。這裏是我到目前爲止:Spark Streaming中的卡夫卡消費者
1)開始動物園管理員。
2)啓動Kafka服務器。
3)發送幾條消息到服務器。我可以看到他們,當我運行以下命令:
斌/ kafka-console-consumer.sh --zookeeper本地主機:2181 --topic mytopic --from-開始
4)現在想寫一個程序來計算在5分鐘內進入的消息數量。
的代碼看起來是這樣的:
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("mytopic", new Integer(1));
JavaStreamingContext ssc = new JavaStreamingContext(
sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});
JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);
不知道什麼樣的價值要使用的第三個參數(消費羣)。當我運行這個時,我得到「無法連接到zookeeper服務器」。但Zookeeper運行在2181端口;否則步驟#3將無法工作。
好像我沒有使用正確KafkaUtils.createStream。有任何想法嗎?
zookeeper是否與Spark一樣運行?您是否試圖通過使用 /current/bin/zkCli.sh連接到Zookeeper來驗證Zookeeper已啓動並運行? –
2014-11-04 00:59:41
我好蠢!我改變了'localhost'到實際的機器名稱並且通過了這個錯誤。但 - 它還沒有工作。任何人都知道卡夫卡下的「消費羣體」的「默認」價值是什麼?它似乎沒有消耗任何消息。 – DilTeam 2014-11-04 06:35:52
我面臨同樣的問題,我沒有收到生產者的任何消息。我正在使用Python生產者。而且我也可以從控制檯用戶那裏獲得msg。 numofparitions在我的配置中也是1。 @DilTeam你是如何解決這個問題的? – Knight71 2015-09-15 11:50:37