2016-03-01 141 views
2

我得到下面的火花流應用程序錯誤,我正在使用kafka輸入流。當我使用套接字時,它工作正常。但是,當我改變爲卡夫卡它給錯誤。任何人都知道爲什麼它會拋出錯誤,我是否需要更改批處理時間並檢查指向時間?Spark Streaming,kafka:java.lang.StackOverflowError

錯誤的StreamingContext:錯誤啓動的背景下,將其標記爲停止 java.lang.StackOverflowError的

我的程序:

def main(args: Array[String]): Unit = { 

    // Function to create and setup a new StreamingContext 
    def functionToCreateContext(): StreamingContext = { 
     val conf = new SparkConf().setAppName("HBaseStream") 
     val sc = new SparkContext(conf) 
     // create a StreamingContext, the main entry point for all streaming functionality 
     val ssc = new StreamingContext(sc, Seconds(5)) 
     val brokers = args(0) 
     val topics= args(1) 
     val topicsSet = topics.split(",").toSet 
     val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
     val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topicsSet) 

     val inputStream = messages.map(_._2) 
// val inputStream = ssc.socketTextStream(args(0), args(1).toInt) 
     ssc.checkpoint(checkpointDirectory) 
     inputStream.print(1) 
     val parsedStream = inputStream 
     .map(line => { 
      val splitLines = line.split(",") 
      (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong))) 
     }) 
     import breeze.linalg.{DenseVector => BDV} 
     import scala.util.Try 

     val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
     (current: Seq[Array[Long]], prev: Option[Array[Long]]) => { 
      prev.map(_ +: current).orElse(Some(current)) 
      .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption) 
     }) 

     state.checkpoint(Duration(10000)) 
     state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) 
     ssc 
    } 
    // Get StreamingContext from checkpoint data or create a new one 
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) 
    } 
} 

回答

0

嘗試刪除檢查點目錄

我不確定,但似乎您的流式上下文無法從檢查點恢復。無論如何,它爲我工作。