2016-08-25 52 views
0

我是kafka和spark的新手,我正在嘗試做一些計數,但不成功!問題的細節如下。謝謝!Kafka + Java + SparkStreaming + reduceByKeyAndWindow拋出異常:org.apache.spark.SparkException:任務不可序列化

代碼作爲初級講座:

JavaPairDStream<String,Integer> counts = wordCounts.reduceByKeyAndWindow(new AddIntegers(), new SubtractIntegers(), Durations.seconds(8000), Durations.seconds(4000)); 

異常如初級講座:在線程 「螺紋-3」 org.apache.spark.SparkException

例外:任務 在 org.apache不可串行化.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean( SparkContext.scala:1623)在 org.apache.spark.streaming.dstream.PairDStreamFunctions.reduceByKeyAndWindow(PairDStreamFunctions.scala:333) 在 org.apache.spark.streaming.dstream.PairDStreamFunctions.reduceByKeyAndWindow(PairDStreamFunctions.scala:299 ) 在 org.apache.spark.streaming.api.java.JavaPairDStream.reduceByKeyAndWindow(JavaPairDStream.scala:352) 在KafkaAndDstreamWithIncrement.KDDConsumer.run(KDDConsumer.java:110) 引起:java.io.NotSerializableException: KafkaAndDstreamWithIncrement.KDDConsumer

+0

向我們展示'addIntegers'和'subtractIntegers' –

+0

感謝您的建議!之前,我總是關注「如何覆蓋reduceBykeyAndWindow」。但是現在我發現在addIntgers和subractIntegers中可能是錯誤的。我試過了,成功了,再次感謝你! –

回答

0

代碼作爲初級講座(定義靜態):

static Function2<Integer,Integer,Integer> AddIntegers = new Function2<Integer,Integer,Integer>(){ 
    @Override 
    public Integer call (Integer i1,Integer i2){ 
     return i1 + i2; 
    } 
}; 
static Function2<Integer,Integer,Integer> SubtractIntegers = new Function2<Integer,Integer,Integer>(){ 
    @Override 
    public Integer call (Integer i1,Integer i2){ 
     return i1 - i2; 
    } 
}; 
相關問題