1

我在本地模式下運行風暴拓撲時遇到此錯誤。 我有一個簡單的程序,檢查一個數字是否爲素數。 我正在使用KafkaSpout作爲處理它的來源和風暴。 卡夫卡版本2.10-0.8.2.1 風暴版本0.9.7 飼養員3.4.6java.lang.RuntimeException:java.lang.ClassCastException:[B不能轉換爲java.lang.String

下面是我的螺栓,檢查素數

public class PrimeNumberBolt extends BaseRichBolt 
{ 
    private static final long serialVersionUID = 1L; 
    private OutputCollector collector; 



    public void prepare(Map conf, TopologyContext context, OutputCollector collector) 
    { 
     this.collector = collector; 
    } 

    public void execute(Tuple tuple) 
    { 
     //System.out.println(tuple.getFields()); 
     //System.out.println(tuple.getString(0)); 
     String num = tuple.getString(0); 
     //int number = tuple.getInteger(0); 
     int number = Integer.parseInt(num); 
     //System.out.println("IN Primenumber bolt = "+number); 

     if(isPrime(number)) 
     { 
      System.out.println(number); 

     } 
     collector.ack(tuple); 
    } 

    public void declareOutputFields(OutputFieldsDeclarer declarer) 
    { 
     declarer.declare(new Fields("number")); 
    } 

    private boolean isPrime(int n) 
    { 
     if(n == 1) 
     { 
      return false; 
     } 
     if(n == 2 || n == 3) 
     { 
      return true; 
     } 

     // Is n an even number? 
     if(n % 2 == 0) 
     { 
      return false; 
     } 

     //if not, then just check the odds 
     for(int i=3; i*i<=n; i+=2) 
     { 
      if(n % i == 0) 
      { 
       return false; 
      } 
     } 
     return true; 
    } 
} 

錯誤:

**18156 [Thread-11-prime] ERROR backtype.storm.util - Async loop died! 
java.lang.RuntimeException: java.lang.ClassCastException: [B cannot be cast to java.lang.String** 
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4] 
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79] 
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String 
    **at backtype.storm.tuple.TupleImpl.getString(TupleImpl.java:112) ~[storm-core-0.9.4.jar:0.9.4] 
    at com.geekcap.storm_test.PrimeNumberBolt.execute(PrimeNumberBolt.java:40) ~[classes/:na]** 
    at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.4.jar:0.9.4] 
    ... 6 common frames omitted 
**18158 [Thread-11-prime] ERROR backtype.storm.daemon.executor - 
java.lang.RuntimeException: java.lang.ClassCastException: [B cannot be cast to java.lang.String** 
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4] 
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79] 
**Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String** 
    at backtype.storm.tuple.TupleImpl.getString(TupleImpl.java:112) ~[storm-core-0.9.4.jar:0.9.4] 
    at com.geekcap.storm_test.PrimeNumberBolt.execute(PrimeNumberBolt.java:40) ~[classes/:na] 
    at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.4.jar:0.9.4] 
    ... 6 common frames omitted 
18375 [Thread-11-prime] ERROR backtype.storm.util - Halting process: ("Worker died") 
java.lang.RuntimeException: ("Worker died") 
    at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4] 
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na] 
    at backtype.storm.daemon.worker$fn__4693$fn__4694.invoke(worker.clj:491) [storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:240) [storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:473) [storm-core-0.9.4.jar:0.9.4] 
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79] 

請給我建議我需要做的代碼中的修改。感謝提前!

+0

什麼tuple.getString(0);回報? –

+1

它返回一個字符串。在我的情況下在kafka集羣數據存儲爲字符串(如232,12等)。所以我正在閱讀和解析爲int。 –

回答

4

它看起來像你的卡夫卡噴口讀取字節數組格式的數據。

嘗試通過像下面設置spoutconfig.scheme使用字符串方案。

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
+1

這是我的kafka鯨魚噴口 ________________________ private static KafkaSpout buildKafkaSentenceSpout(){ String zkHostPort =「localhost:2181」; String topic =「cust3」; String zkRoot =「/ prime」; String zkSpoutId =「prime-spout」; ZkHosts zkHosts = new ZkHosts(zkHostPort); SpriteConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); SpoutConfig spoutCfg = new SpoutConfig(zkHosts,topic,zkRoot,zkSpoutId); KafkaSpout kafkaSpout =新的KafkaSpout(spoutCfg); return kafkaSpout; } –

+1

我已經添加了它。但它引發錯誤,因爲無法對非靜態字段進行靜態引用SpoutConfig.scheme –

+1

在您的情況下,我認爲您可以最後聲明,如spoutCfg.scheme = new SchemeAsMultiScheme(new StringScheme ()); –

相關問題