2017-01-09 125 views
2

我正在使用Apache NiFi和Apache Spark爲大學做一個小型項目。我想創建一個NiFi工作流,它可以從HDFS中讀取TSV文件,並使用Spark Streaming我可以處理這些文件並在MySQL中存儲我需要的信息。我已經在NiFi中創建了我的工作流程,並且存儲部分已經在工作。問題是我無法解析NiFi包,所以我可以使用它們。使用spark解析NiFi數據包

的文件包含的行這樣的:

linea1File1 TheReceptionist 653 Entertainment 424 13021 4.34 1305 744 DjdA-5oKYFQ NxTDlnOuybo c-8VuICzXtU 

其中每個空間是一個標籤( 「\ t」 的)

這是我在星火代碼使用的Scala:

val ssc = new StreamingContext(config, Seconds(10)) 
val packet = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_ONLY)) 
val file = packet.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8)) 

在此之前,我可以在單個字符串中獲取整個文件(7000多行)......不幸的是,我無法將這個字符串拆分成多行。我需要整行獲取整個文件,所以我可以在一個對象中解析它,在其上應用一些操作並存儲我想要的內容

任何人都可以幫助我?

回答

3

每個數據包都將是來自NiFi的一個流文件的內容,因此如果NiFi從HDFS中拾取具有很多行的一個TSV文件,那麼所有這些行都將位於一個數據包中。

很難說沒有看到你的NiFi流,但你可能可以使用行數爲1的SplitText在你的TSFi中分裂你的TSV,然後才能觸發流式傳輸。

+0

非常感謝你......這完全解決了我的問題......我從來沒有想過用NiFi解決它..我專注於Spark ...... –