2
我得到下面的火花流應用程序錯誤,我正在使用kafka輸入流。當我使用套接字時,它工作正常。但是,當我改變爲卡夫卡它給錯誤。任何人都知道爲什麼它會拋出錯誤,我是否需要更改批處理時間並檢查指向時間?Spark Streaming,kafka:java.lang.StackOverflowError
錯誤的StreamingContext:錯誤啓動的背景下,將其標記爲停止 java.lang.StackOverflowError的
我的程序:
def main(args: Array[String]): Unit = {
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sc, Seconds(5))
val brokers = args(0)
val topics= args(1)
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val inputStream = messages.map(_._2)
// val inputStream = ssc.socketTextStream(args(0), args(1).toInt)
ssc.checkpoint(checkpointDirectory)
inputStream.print(1)
val parsedStream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
})
import breeze.linalg.{DenseVector => BDV}
import scala.util.Try
val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
(current: Seq[Array[Long]], prev: Option[Array[Long]]) => {
prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})
state.checkpoint(Duration(10000))
state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
}
}