數據流流管道輸出:數據流讀取文件DOFN與線條的流
一些源代碼上傳壓縮文件GCS - >上傳事件:發送到PubSub的(GS ///folder/file.gz) - >從PubSub的我流的數據流文件讀取事件/ O - > DOFN取消Gzip已
static class CustomDoFn extends DoFn<String, String>{
@Override
public void processElement(ProcessContext c) throws Exception {
String gcsPath = c.element();
Open ReadChannel with GCS
Get Stream from Channel
while((line = stream.ReadLine()) != null){
c.output(line) // Is this good way to read and send line down the pipeline?
}
}
//將要管道
pipeline.apply(PubSubIO.Read()).
apply(ParDO.of(new CustomDoFn())).
apply(new CustomTX()).
apply(BigQueryIO.Write());
疑惑是:
1.是否正確的方法在DoFn中產生循環輸出?
2.如何在Dofn內使用FileBasedSource.FileBasedReader?
你好,我只是想確保我理解你的問題。你想創建一個從文件讀取的流式管道嗎?你想用Pub/Sub來接收文件名並讀取它們?你的文件大小非常大嗎?讀取文件和輸出每行的方式有一個問題,就是整個文件在實際發出之前必須讀入內存。對於大文件,這不起作用,可以OOM。 –