我在Google雲端存儲中擁有自定義文件格式,我想從Google DataFlow中讀取它。如何在Google DataFlow中爲Google雲端存儲文件實現自定義文件解析器
我已經通過繼承FileBasedReader實現了一個Source和一個Reader,但後來我意識到它不支持從Google Cloud Storage中讀取(而FileBasedSink實際上......),所以我不確定什麼是最好的想法在這裏解決...
我試圖子類TextIO,但我不能達到目的,因爲它似乎並沒有被設計爲subclassed。
關於如何處理這個問題的任何好主意?
謝謝。
更新,以反映在註釋中使用
文件模式:gs://mybucket/my.json
從FileBasedSource實現源類:
MessageSource<T> extends FileBasedSource<T>
實現Reader類(我真正關心這裏)從FileBasedReader:
MessageReader<T> extends FileBasedReader<T>
個工藝讀數爲:
MySource source = // instantiate source
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from(options.getSource()).named("ReadFileData"))
.apply(ParDo.of(new DoFn<String, String>() {
而且的getSource()來源於此命令行參數(覈對無誤):
--source=gs://${BUCKET_NAME}/my.json \
我錯過了什麼?
月2日更新
在運行source.getEstimatedSizeBytes(options)
它告訴我沒有處理髮現的?
java.io.IOException: Unable to find handler for gs://mybucket/my.json
at com.google.cloud.dataflow.sdk.util.IOChannelUtils.getFactory(IOChannelUtils.java:186)
at com.google.cloud.dataflow.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:182)
at com.etc.TrackingDataPipeline.main(TrackingDataPipeline.java:66)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
我以爲FileBasedSource應該處理GCS?
嗯,FileBasedReader絕對是打算用於Google雲端存儲。你能否編輯你的問題來澄清你在使用時遇到的問題? – jkff
嗯...我可能錯過了它,但是我看不到在源代碼中對GCS的任何引用?無論如何,我遇到的問題是文件永遠不會被加載,並且不會觸發錯誤。我不知道如何調試。我還沒有找到任何使用FileBasedReader做同樣事情的例子。我會嘗試更新這個問題,以便根據您的評論@jkff – nembleton
來反映這個問題,請更新這個問題並提供更多詳細信息 - 沒有這些,我可以做的不多。事實上,幾乎所有的資源,包括TextIO,都是在FileBasedSource/Reader的基礎上實現的。 – jkff