我不確定這個論壇是否正確。 Storm使用Storm KafkaSpout連接器從Kafka話題中消費。它一直工作到現在。現在我們應該連接到一個新的Kafka集羣,其升級版本爲0.10.x
,其版本號爲0.10.x
。使用Storm 0.10.x(KafkaSpout)從卡夫卡0.10.x主題中消費
從風暴文檔(http://storm.apache.org/releases/1.1.0/storm-kafka-client.html)我可以看到,風暴1.1.0
與卡夫卡0.10.x
起支持新的卡夫卡消費者API兼容。但在那種情況下,我將無法在我的最後運行拓撲(如果我錯了,請糾正我)。
有沒有解決這個問題的方法? 我已經看到,即使新的Kafka Consumer API已經刪除了ZooKeeper的依賴關係,但我們仍然可以使用舊的Kafka-console-consumer.sh
通過傳遞--zookeeper
標誌而不是新的–bootstrap-server
標誌(推薦)來使用它的消息。我使用卡夫卡0.9運行這個命令並能夠從託管在卡夫卡的話題消耗0.10.x
當我們試圖連接,提示以下異常:
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /brokers/topics/mytopic/partitions
at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) ~[stormjar.jar:?]
at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) ~[stormjar.jar:?]
但我們都能夠連接到遠程ZK服務器並驗證該路徑存在:
./zkCli.sh -server remoteZKServer:2181
[zk: remoteZKServer:2181(CONNECTED) 5] ls /brokers/topics/mytopic/partitions
[3, 2, 1, 0]
正如我們上面所看到的,它給我們預期的輸出,因爲主題中有4個分區。
此時有以下問題:
1)它是在所有可能的連接到卡夫卡0.10.x
使用暴風版0.10.x
?有人試過這個嗎?
2)即使我們能夠使用,我們是否需要進行任何代碼更改以便在拓撲關閉/重新啓動的情況下檢索消息偏移量。我這樣問,因爲我們將傳遞Zk羣集細節,而不是舊KafkaSpout版本支持的代理信息。
暗戰選擇這裏,任何指針將高度讚賞
UPDATE:
我們能夠連接,當運行在本地使用eclipse從遠程卡夫卡話題消耗。爲了確保Storm不使用內存中的zk,我們使用了重載的構造函數LocalCluster("zkServer",port)
,它工作正常,我們可以看到數據來了。這導致我們得出結論,版本兼容性可能不是這裏的問題。
但是,在集羣中部署拓撲時仍然沒有運氣。 我們已驗證從風暴箱到zkservers的連通性 該znode似乎也很好..
此時真的需要一些指針,這可能有什麼問題,我們該如何調試?從來沒有使用卡夫卡0.10x之前,所以不知道我們錯過了什麼。
真的很感謝一些幫助和建議