2015-04-21 72 views
2

我正在嘗試構建數據流管道以處理包含跨越多行的事件的文本文件。數據流SDK TextIO類假定每行都是新事件。從數據流中的文本文件處理多行事件

我的計劃是創建一個新的TextReader並將其註冊到DataPipelineRunner。這位新讀者將知道如何將多條線路聚合成一條線路。

我很確定這種方法可行,但我想知道這是否是正確的方法或者是否有更簡單的解決方案?

我試圖解析的文字是:

==============> len:45 pktype:4 mtype:2 
SYMBOL: USOCSTIA151632.00 
OPEN_INT: 212 
PR_OPEN_INTEREST: 212 
TIME_STAMP: 04/10/2015 06:30:17:420 val:1428661817 

結果應該是最後的4線連接到一起,並在第一線下降。

最好的問候, 彼得

+0

還有幾個問題:1)你有很多小文件,還是你的文件很大? (即是否要在一個文件中並行處理)2)可以通過查找「==============」來檢測文件中記錄的開頭是否正確>「? – jkff

+0

有很多大型(200G +)文件需要處理。以「===>」開頭的行確實表示有新的記錄,但我需要將該行從輸出中刪除。 –

+0

謝謝!我相應地更新了我的答案。 – jkff

回答

2

注意的TextReader是一個內部實現細節類,因此子類將是非常氣餒和挑戰做正確。

建議使用user-defined source API來定義基於文件的新格式,如子類FileBasedSource

在你的情況,我會建議立足於從文檔LineIO比如你的類,並且包裝定義有到自己的類LineReader它將使用LineReader作爲幫助閱讀各行,但:

  • startReading()它會跳過,直到以「====>」開頭的行
  • readNextRecord()它會讀取直到下一個「====>」的行並將它們捆綁到單個記錄中。

請務必仔細閱讀文檔,FileBasedSource和FileBasedReader:並行機制依賴於有描述的一致性屬性,您的格式必須滿足,確保記錄不重複或省略之間的界限相鄰的處理碎片。 XmlSource tests是如何對這些屬性進行單元測試的一個很好的例子。

請告訴我們它是怎麼回事,並報告任何問題或問題 - 我們對此API的反饋非常感興趣。

+0

是的,我很快發現擴展TextReader非常困難。我最初嘗試對子類進行分類,但遇到了一些問題。我會回頭追蹤並再次觀看。 –

+0

「LineIO示例」在哪裏? – absolutelyNoWarranty

相關問題