0

我正在使用textFileStream將數據導入到Spark Streaming中。但數據只能處理一批。我的第一個問題是,它不是從文件中傳輸每一條記錄。textFileStream中的混淆

根據https://docs.databricks.com/spark/latest/rdd-streaming/debugging-streaming-applications.htmlFor TextFileStream, since files are input, the # of input events is always 0. In such cases, you can look at the 「Completed Batches」 section in the notebook to figure out how to find more information.

其次,我想知道有多少記錄被火花Engine.For例如處理,如果我流的1GB contanining 100K數據記錄的文件,我想知道如何很多Spark Streaming執行它。

任何人都可以請分享他們的想法或一些有用的鏈接。任何幫助,將不勝感激。

謝謝。

星火版本:2.0.1 資料擷取來自Amazon S3通過textFileStream

回答

0

還有就是看這個沒有直接的方法,但你可以實現這一點使用自定義代碼。

例如,當你處理由textFileStream產生的DSTREAM使用

dStream.forEachRDD{rdd => rdd.forEachPartition{part => { }} } 

因此可以簡單地添加內rdd.forEachPartition {}塊中的一些代碼,用於將在更新任一累加器或沒有記錄處理添加信息卡夫卡主題或添加信息飼養員甚至更新MySQL數據庫:)

dStream.forEachRDD{rdd => rdd.forEachPartition{part => { 
      var recordProcessed:Int = 0; ... 
      part.foreach{...;recordProcessed+=1} 
      //update recordProcessed in kafka/HBase/Mysql/Zookepeer 
    }} } 

其可進一步用於可視化沒有使用可視化工具的記錄過程。