2017-05-25 71 views
0

我不確定這個論壇是否正確。 Storm使用Storm KafkaSpout連接器從Kafka話題中消費。它一直工作到現在。現在我們應該連接到一個新的Kafka集羣,其升級版本爲0.10.x,其版本號爲0.10.x使用Storm 0.10.x(KafkaSpout)從卡夫卡0.10.x主題中消費

從風暴文檔(http://storm.apache.org/releases/1.1.0/storm-kafka-client.html)我可以看到,風暴1.1.0與卡夫卡0.10.x起支持新的卡夫卡消費者API兼容。但在那種情況下,我將無法在我的最後運行拓撲(如果我錯了,請糾正我)。

有沒有解決這個問題的方法? 我已經看到,即使新的Kafka Consumer API已經刪除了ZooKeeper的依賴關係,但我們仍然可以使用舊的Kafka-console-consumer.sh通過傳遞--zookeeper標誌而不是新的–bootstrap-server標誌(推薦)來使用它的消息。我使用卡夫卡0.9運行這個命令並能夠從託管在卡夫卡的話題消耗0.10.x

當我們試圖連接,提示以下異常:

java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /brokers/topics/mytopic/partitions 
at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) ~[stormjar.jar:?] 
at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) ~[stormjar.jar:?] 

但我們都能夠連接到遠程ZK服務器並驗證該路徑存在:

 ./zkCli.sh -server remoteZKServer:2181 

     [zk: remoteZKServer:2181(CONNECTED) 5] ls /brokers/topics/mytopic/partitions 
     [3, 2, 1, 0] 

正如我們上面所看到的,它給我們預期的輸出,因爲主題中有4個分區。

此時有以下問題:

1)它是在所有可能的連接到卡夫卡0.10.x使用暴風版0.10.x?有人試過這個嗎?

2)即使我們能夠使用,我們是否需要進行任何代碼更改以便在拓撲關閉/重新啓動的情況下檢索消息偏移量。我這樣問,因爲我們將傳遞Zk羣集細節,而不是舊KafkaSpout版本支持的代理信息。

暗戰選擇這裏,任何指針將高度讚賞

UPDATE:
我們能夠連接,當運行在本地使用eclipse從遠程卡夫卡話題消耗。爲了確保Storm不使用內存中的zk,我們使用了重載的構造函數LocalCluster("zkServer",port),它工作正常,我們可以看到數據來了。這導致我們得出結論,版本兼容性可能不是這裏的問題。

但是,在集羣中部署拓撲時仍然沒有運氣。 我們已驗證從風暴箱到zkservers的連通性 該znode似乎也很好..

此時真的需要一些指針,這可能有什麼問題,我們該如何調試?從來沒有使用卡夫卡0.10x之前,所以不知道我們錯過了什麼。

真的很感謝一些幫助和建議

回答

0

風暴0.10x與卡夫卡0.10x兼容。我們仍然可以使用舊的KafkaSpout,這取決於基於zookeeper的偏移量存儲機制。

連接丟失異常即將到來,因爲我們試圖到達不允許/接受來自我們端的連接的遠程Kafka集羣。我們需要打開特定的防火牆端口,以便建立連接。看起來,雖然運行拓撲結構是集羣模式,但所有管理節點都應該能夠與動物園管理員通話,所以防火牆應該對每個節點開放。