0
object SparkMain extends App {
System.setProperty("spark.cassandra.connection.host", "127.0.0.1")
val conf = new SparkConf().setMaster("local[2]").setAppName("kafkaspark").set("spark.streaming.concurrentJobs","4")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
val sqlContext= new SQLContext(sc)
val host = "localhost:2181"
val topicList = List("test","fb")
topicList.foreach{
topic=> val lines =KafkaUtils.createStream(ssc, host, topic, Map(topic -> 1)).map(_._2);
//configureStream(topic, lines)
lines.foreachRDD(rdd => rdd.map(test(_)).saveToCassandra("test","rawdata",SomeColumns("key")))
}
ssc.addStreamingListener(new StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
System.out.println("Batch completed, Total delay :" + batchCompleted.batchInfo.totalDelay.get.toString + " ms")
}
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
println("inside onReceiverStarted")
}
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
println("inside onReceiverError")
}
override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {
println("inside onReceiverStopped")
}
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
println("inside onBatchSubmitted")
}
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
println("inside onBatchStarted")
}
})
ssc.start()
println("===========================")
ssc.awaitTermination()
}
case class test(key: String)
如果我把任何一個主題在同一時間,然後每個主題的作品。但是當主題列表有多個主題時,在獲得kafka
主題中的DataStream
之後,它將繼續打印「onBatchSubmitted」。處理muitsple kafka主題與單火花流上下文掛起batchSubmitted