我想在Scala中編寫簡單的Spark代碼。如何在DStream中應用RDD函數,同時在scala中編寫代碼
這裏我得到一個DStream。我成功地能夠打印此DStream。但是當我試圖在此DStream上執行任何種類的「foreach」,「foreachRDD」或「transform」函數時,在執行我的程序期間,我的控制檯正在凍結。在這裏,我沒有收到任何錯誤,但是直到我手動終止eclipse控制檯操作時,控制檯才變得無響應。我在這裏附上代碼。請告訴我我做錯了什麼。
我的主要目標是在DStream上應用RDD操作,並根據我的知識使用「foreach」,「foreachRDD」或「transform」函數將我的DStream轉換爲RDD。
我已經通過使用Java實現了相同。但在斯卡拉我有這個問題。
是否有其他人面臨同樣的問題?如果沒有,那麼請幫助我。由於
我寫的樣本代碼在這裏
object KafkaStreaming {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val ssc = new StreamingContext("local", "KafkaWordCount", Seconds(2))
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
val splitLines:DStream[String] = lines.flatMap(_.split("\n"))
val pairAlarm = splitLines.map(
x=>{
//Some Code
val alarmPair = new Tuple2(key, value)
alarmPair
}
)
//pairAlarm.print
pairAlarm.foreachRDD(x=>{
println("1 : "+x.first)
x.collect // When the execution reaching this part its getting freeze
println("2: "+x.first)
})
ssc.start()
ssc.awaitTermination()
}
}
謝謝Serejja。在我看到你的答案後,我意識到了這個錯誤。這真是一個非常愚蠢的問題。 :D謝謝 – 2014-09-01 10:23:47