2017-02-17 29 views
0

數據流流管道輸出:數據流讀取文件DOFN與線條的流

一些源代碼上傳壓縮文件GCS - >上傳事件:發送到PubSub的(GS ///folder/file.gz) - >從PubSub的我流的數據流文件讀取事件/ O - > DOFN取消Gzip已

static class CustomDoFn extends DoFn<String, String>{ 

@Override 
public void processElement(ProcessContext c) throws Exception { 
    String gcsPath = c.element(); 
    Open ReadChannel with GCS 
    Get Stream from Channel 
    while((line = stream.ReadLine()) != null){ 
     c.output(line) // Is this good way to read and send line down the pipeline? 
    } 
} 

//將要管道

pipeline.apply(PubSubIO.Read()). 
      apply(ParDO.of(new CustomDoFn())). 
      apply(new CustomTX()). 
      apply(BigQueryIO.Write()); 

疑惑是:
1.是否正確的方法在DoFn中產生循環輸出?
2.如何在Dofn內使用FileBasedSource.FileBasedReader

+0

你好,我只是想確保我理解你的問題。你想創建一個從文件讀取的流式管道嗎?你想用Pub/Sub來接收文件名並讀取它們?你的文件大小非常大嗎?讀取文件和輸出每行的方式有一個問題,就是整個文件在實際發出之前必須讀入內存。對於大文件,這不起作用,可以OOM。 –

回答

0

目前無法使用具有動態文件名的FileBasedSource(即文件名未在管道構造中指定)。對Apache Beam 2.0(https://issues.apache.org/jira/browse/BEAM-65)的未來改進將啓用此功能,但尚未準備好使用。正如Alex Amato指出的那樣,您的概述方法將會受到大型文件的內存限制,但應以其他方式產生功能流水線。

+0

現在有一個更具體的JIRA提交給這個用例,我們正在接近能夠實現它https://issues.apache.org/jira/browse/BEAM-2511 TextIO應該支持閱讀一個PCollection文件名 – jkff