0
我正嘗試從RDBMS創建自定義流接收器。Spark Streaming Empty RDD問題
val dataDStream = ssc.receiverStream(new inputReceiver())
dataDStream.foreachRDD((rdd:RDD[String],time:Time)=> {
val newdata=rdd.flatMap(x=>x.split(","))
newdata.foreach(println) // *******This line has problem, newdata has no records
})
ssc.start()
ssc.awaitTermination()
}
class inputReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("RDBMS data Receiver") {
override def run() {
receive()
}
}.start()
}
def onStop() {
}
def receive() {
val sqlcontext = SQLContextSingleton.getInstance()
// **** I am assuming something wrong in following code
val DF = sqlcontext.read.json("/home/cloudera/data/s.json")
for (data <- rdd) {
store(data.toString())
}
logInfo("Stopped receiving")
restart("Trying to connect again")
}
}
代碼執行時沒有錯誤,但沒有打印數據幀中的任何記錄。
我使用的Spark 1.6和Scala
對於(data < - rdd)''不是你如何使用RDD。另外,您曾嘗試打印任何Dataframe? –
http://asyncified.io/2017/02/10/why-you-might-be-misusing-sparks-streaming-api/ –
我試圖在我的代碼'dataDStream.foreachRDD((rdd: RDD [String],time:Time)=> {0} {val – Jhon