2017-05-04 151 views
0

產生的原因:了java.lang.RuntimeException: java.io.NotSerializableException: io.netty.channel。 DefaultChannelHandlerContext在 org.apache.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:41) 〜[風暴芯1.0.1.2.5.0.0-1245.jar:1.0.1.2.5.0.0-1245] at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) 〜[kryo-3.0.3.jar :?] at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:113 ) 〜[kryo-3.0.3.jar :?] at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39) 〜[kryo-3.0.3.jar :?] at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) 〜[kryo-3.0.3.jar :?] at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) 〜[kryo-3.0.3.jar :?] at com。 (Kryo.java:534) 〜 KRYO-3.0.3.jar:?]在 org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) 〜[風暴芯1.0.1.2.5.0.0-1 245.jar:1.0.1.2.5.0.0-1245]在 org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) 〜[風暴芯1.0.1.2.5.0.0-1245的.jar:1.0.1.2.5.0.0-1245]在 org.apache.storm.daemon.worker $ $ mk_transfer_fn transfer_fn__6723.invoke(worker.clj:192) 〜[風暴芯1.0.1.2.5.0。 0-1245.jar:1.0.1.2.5.0.0-1245] at org.apache.storm.daemon.executor $ start_batch_transfer__GT_worker_handler_BANG_ $ fn__6411.invoke(executor.clj:313) 〜[storm-core-1.0.1.2 .5.0.0-1245.jar:1.0.1.2.5.0.0-1245]在 org.apache.storm.disruptor $ $ clojure_handler reify__6005.onEvent(disruptor.clj:40) 〜[風暴核-1.0。 1.2.5.0.0-1245.jar:1.0.1.2.5.0.0-1245] org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue。的java:451) 〜[風暴芯1.0.1.2.5.0.0-1245.jar:1.0.1.2.5.0.0-1245] ... 6更產生的原因:了java.lang.RuntimeException:java.io.NotSerializableException:io.netty.channel.DefaultChannelHandlerContext

我使用風暴本地模式是沒有問題的,但在集羣上會報故障。

這是我的代碼:

public class NettySpout extends BaseRichSpout { 

private static final long serialVersionUID = 1L; 
/** 
* colloctor for spout 
*/ 
private SpoutOutputCollector collector; 

@Override 
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { 
    collector=spoutOutputCollector; 
    StormServer stormServer=new StormServer(); 
    stormServer.run(); 
} 

@Override 
public void nextTuple() { 
    Values tuple; 
    try { 
     while ((tuple = ServerHandler.queue.take()) != null) { 
      collector.emit(tuple); 
     } 
    } catch (Exception e) { 
    } 
} 

@Override 
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 
    outputFieldsDeclarer.declare(new Fields("value","channl")); 
} 


public class ServerHandler extends ChannelInboundHandlerAdapter{ 

private static Logger logger = LogManager.getLogger(ServerHandler.class); 
public static LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<Values>(); 
public static Map<String,ChannelHandlerContext> ctxes; 

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
    JSONObject message = (JSONObject) msg; 
    queue.put(new Values(new StreamData(message.toString().getBytes()), new HashMap<>(ctxes))); 

} 

回答

1

我沒有多少知識約風暴本身,而是好像你嘗試連載ChannelHandlerContext(作爲其存儲在您的地圖),這是不可序列化。

+0

你是對的,ChannelHandlerContext無法序列,也許這樣是不對的,我應該嘗試其他way.Thank你的回答。 – Tdz

相關問題