2017-02-14 56 views
0

我正嘗試從RDBMS創建自定義流接收器。Spark Streaming Empty RDD問題

val dataDStream = ssc.receiverStream(new inputReceiver()) 
    dataDStream.foreachRDD((rdd:RDD[String],time:Time)=> { 
    val newdata=rdd.flatMap(x=>x.split(",")) 
    newdata.foreach(println) // *******This line has problem, newdata has no records 
    }) 

ssc.start() 
ssc.awaitTermination() 
} 

class inputReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { 
    def onStart() { 
    // Start the thread that receives data over a connection 
    new Thread("RDBMS data Receiver") { 
     override def run() { 
     receive() 
     } 
    }.start() 
    } 
    def onStop() { 
    } 

    def receive() { 
    val sqlcontext = SQLContextSingleton.getInstance() 

    // **** I am assuming something wrong in following code 
    val DF = sqlcontext.read.json("/home/cloudera/data/s.json") 
    for (data <- rdd) { 
     store(data.toString()) 
    } 
    logInfo("Stopped receiving") 
    restart("Trying to connect again") 
    } 
} 

代碼執行時沒有錯誤,但沒有打印數據幀中的任何記錄。

我使用的Spark 1.6和Scala

+0

對於(data < - rdd)''不是你如何使用RDD。另外,您曾嘗試打印任何Dataframe? –

+1

http://asyncified.io/2017/02/10/why-you-might-be-misusing-sparks-streaming-api/ –

+0

我試圖在我的代碼'dataDStream.foreachRDD((rdd: RDD [String],time:Time)=> {0} {val – Jhon

回答

0

爲了讓你的代碼工作,你應該改變如下:

def receive() { 
    val sqlcontext = SQLContextSingleton.getInstance() 
    val DF = sqlcontext.read.json("/home/cloudera/data/s.json") 

    // **** this: 
    rdd.collect.foreach(data => store(data.toString())) 

    logInfo("Stopped receiving") 
    restart("Trying to connect again") 
} 

無論其,這是不可取的,因爲一切都在你的JSON文件中的數據會由駕駛員處理,並且接收機沒有得到適當的可靠性考慮。

我懷疑Spark Streaming不適合您的使用情況。在各行之間進行閱讀,似乎要麼是流媒體,因此需要合適的製作人員,或者您正在讀取從RDBMS轉儲到json的數據,在這種情況下,您不需要Spark Streaming。