2016-04-29 25 views
1

我在Google雲端存儲中擁有自定義文件格式,我想從Google DataFlow中讀取它。如何在Google DataFlow中爲Google雲端存儲文件實現自定義文件解析器

我已經通過繼承FileBasedReader實現了一個Source和一個Reader,但後來我意識到它不支持從Google Cloud Storage中讀取(而FileBasedSink實際上......),所以我不確定什麼是最好的想法在這裏解決...

我試圖子類TextIO,但我不能達到目的,因爲它似乎並沒有被設計爲subclassed。

關於如何處理這個問題的任何好主意?

謝謝。

更新,以反映在註釋中使用

文件模式:gs://mybucket/my.json

從FileBasedSource實現源類:

MessageSource<T> extends FileBasedSource<T> 

實現Reader類(我真正關心這裏)從FileBasedReader:

MessageReader<T> extends FileBasedReader<T> 

個工藝讀數爲:

MySource source = // instantiate source 
Pipeline p = Pipeline.create(options); 
p.apply(TextIO.Read.from(options.getSource()).named("ReadFileData")) 
    .apply(ParDo.of(new DoFn<String, String>() { 

而且的getSource()來源於此命令行參數(覈對無誤):

--source=gs://${BUCKET_NAME}/my.json \ 

我錯過了什麼?

月2日更新

在運行source.getEstimatedSizeBytes(options)它告訴我沒有處理髮現的?

java.io.IOException: Unable to find handler for gs://mybucket/my.json 
at com.google.cloud.dataflow.sdk.util.IOChannelUtils.getFactory(IOChannelUtils.java:186) 
at com.google.cloud.dataflow.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:182) 
at com.etc.TrackingDataPipeline.main(TrackingDataPipeline.java:66) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) 
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) 
at java.lang.Thread.run(Thread.java:745) 

我以爲FileBasedSource應該處理GCS?

+1

嗯,FileBasedReader絕對是打算用於Google雲端存儲。你能否編輯你的問題來澄清你在使用時遇到的問題? – jkff

+0

嗯...我可能錯過了它,但是我看不到在源代碼中對GCS的任何引用?無論如何,我遇到的問題是文件永遠不會被加載,並且不會觸發錯誤。我不知道如何調試。我還沒有找到任何使用FileBasedReader做同樣事情的例子。我會嘗試更新這個問題,以便根據您的評論@jkff – nembleton

+0

來反映這個問題,請更新這個問題並提供更多詳細信息 - 沒有這些,我可以做的不多。事實上,幾乎所有的資源,包括TextIO,都是在FileBasedSource/Reader的基礎上實現的。 – jkff

回答

2

從「第二次更新」中顯示的堆棧跟蹤中,您看起來像直接從main()方法調用了getEstimatedSizeBytes。預計這會導致您看到的錯誤。

標準URL方案處理程序是在構建管道運行程序時註冊的。在您的示例代碼中,當您撥打Pipeline.create(options)(這稱爲PipelineRunner.fromOptions(options),標準處理程序已註冊)時會發生這種情況。

如果您希望在運行管道以外的環境中註冊標準URL方案,您可以明確地調用IOChannelUtils.registerStandardIOFactories()。我應該注意到,這不是一個支持的API,但有點「隱藏」。因此,它可能隨時改變。

+0

是的你是對的。感謝細節。我後來意識到這一點,並得到了Google支持的證實。可悲的是,它與根本問題沒有關係。 – nembleton

相關問題