2015-06-02 54 views
4

我對Apache Storm很陌生並且一直試圖用卡夫卡的三叉戟拓撲結構,即TransactionalTridentKafkaSpout。除了Storm UI之外,所有的工作都很好。即使我沒有向我的主題提供任何數據,Storm UI仍然顯示無效的發射/傳輸值。即使主題中沒有數據,計數也會繼續增加。我試着刪除存儲在飼養員,風暴,卡夫卡的數據/日誌和重建卡夫卡主題和還建立風暴UI不正確的值和毛細管工具

topology.stats.sample.rate: 1.0 

,但仍是問題仍然存在。

而且我還遇到了一個名爲Capillary的工具來監測風暴集羣。 我使用下面的特性

capillary.zookeepers="192.168.125.20:2181" 
capillary.kafka.zkroot="192.168.125.20:/home/storm/kafka_2.11-0.8.2.0" 
capillary.storm.zkroot="192.168.125.20:/home/storm/apache-storm-0.9.3" 

我在這裏用卡夫卡的嵌入式飼養員。 即使這不起作用得到下面的例外。

! @6mbg4bp7l - Internal server error, for (GET) [/] -> 

play.api.Application$$anon$1: Execution exception[[JsonParseException: Unexpected character ('.' (code 46)): Expected space separating root-level values 
at [Source: [email protected]; line: 1, column: 9]]] 
     at play.api.Application$class.handleError(Application.scala:296) ~[com.typesafe.play.play_2.10-2.3.4.jar:2.3.4] 
     at play.api.DefaultApplication.handleError(Application.scala:402) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4] 
     at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$14$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:205) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4] 
     at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$14$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:202) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4] 
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) [org.scala-lang.scala-library-2.10.4.jar:na] 
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('.' (code 46)): Expected space separating root-level values 
at [Source: [email protected]; line: 1, column: 9] 
     at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1524) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2] 
     at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:557) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2] 
     at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:475) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2] 
     at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:495) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2] 
     at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._verifyRootSpace(ReaderBasedJsonParser.java:1178) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2] 

任何一方面的幫助都很棒。提前致謝。

配置和源代碼片段:

final Config config = new Config(); 
    config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 3000); 
    config.setNumWorkers(2); 
    config.put(Config.NIMBUS_HOST, "192.168.125.20"); 
    config.put(Config.NIMBUS_THRIFT_PORT, 6627); 
    config.put(Config.STORM_ZOOKEEPER_PORT, 2181); 
    config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.125.20")); 
    config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); 
    config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1); 
    config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10); 
    config.put(Config.DRPC_SERVERS, Arrays.asList("192.168.125.20")); 
    config.put(Config.DRPC_PORT, 3772); 

final BrokerHosts zkHosts = new ZkHosts("192.168.125.20"); 
final TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, "Test_Topic", ""); 
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
kafkaConfig.bufferSizeBytes = 1024 * 1024 * 4; 
kafkaConfig.fetchSizeBytes = 1024 * 1024 * 4; 
kafkaConfig.forceFromStart = false; 

final TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig); 
final TridentTopology topology = new TridentTopology(); 
topology.newStream("spout", kafkaSpout) 
     .each(new Fields("str"), new TestFunction(), new Fields("test")) 
     .each(new Fields("str"), new PrintFilter()); 

拓撲摘要圖片: Topology Stats

+0

有沒有使用毛細管,但它看起來像在自己的源代碼一些錯誤的配置file.looking請問你的conf /路由文件看起來? – user2720864

+0

@ user2720864感謝您的回覆。我在我的問題中更新了我的配置設置。 – DMA

+0

我相信這是用毛細管打動的東西。可能值得看看https://github.com/keenlabs/capillary/issues/5 – user2720864

回答

2

你可能看到了什麼,我會打電話的UI metric artifacts of Trident?這些元組也出現在Storm UI的計數器中:

Trident每500毫秒執行一次批處理(默認情況下)。批次涉及到協調消息的所有螺栓以協調批次 (即使批次爲空)。這就是你所看到的。

(來源:Trident Kafka Spout - Ack Count Increasing Even Though No Messages Are Processed

+0

非常感謝您的回覆。是的,這可能就是我在UI中看到的。協調消息不應該顯示爲發射/處理值,這會導致知道實際值時出現混淆。 有沒有一種方法可以獲得發送/處理的消息的實際數量,而忽略了您提到的鏈接中提到的協調消息數量? – DMA

+0

@DMA你有沒有找到解決這個問題的方法? – Sach

+0

@miguno鏈接似乎已過期。你有備用鏈接嗎? – Sach