我有一段代碼可以提取Google雲端存儲中.ZIP文件的內容。它工作正常,但我需要使用此代碼與將在運行時提供的文件路徑(「gs://some_bucket/filename.zip」)。當我嘗試使用運行值,我得到一個錯誤,如:使用ValueProvider作爲Apache Beam中的路徑提取zip內容
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize [email protected]
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
at org.apache.beam.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:83)
at org.apache.beam.sdk.io.Read$Bounded.<init>(Read.java:94)
at org.apache.beam.sdk.io.Read$Bounded.<init>(Read.java:89)
at org.apache.beam.sdk.io.Read.from(Read.java:48)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Read.expand(BigQueryIO.java:535)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Read.expand(BigQueryIO.java:292)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:164)
at BeamTest2.StarterPipeline.main(StarterPipeline.java:180)
Caused by: java.io.NotSerializableException: org.apache.beam.sdk.Pipeline
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.writeObject(Unknown Source)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
... 11 more
的代碼,我使用的是:
//Unzip incoming file
PCollection<TableRow> temp = p.apply(BigQueryIO.read().fromQuery(
NestedValueProvider.of(
options.getInputFile(),
new SerializableFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public String apply(String filepath) {
try{
List<GcsPath> gcsPaths = util.expand(GcsPath.fromUri(filepath));
LOG.info(gcsPaths+"FilesUnzipped");
List<String> paths = new ArrayList<String>();
for(GcsPath gcsp: gcsPaths){
paths.add(gcsp.toString());
}
p.apply(Create.of(paths))
.apply(ParDo.of(new UnzipFN(filepath)));
}
catch(Exception e)
{
LOG.info("Exception caught while extracting ZIP");
}
return "";
}
})).usingStandardSql().withoutValidation());
UnzipFN類:
public class UnzipFN extends DoFn<String,Long>{
private long filesUnzipped=0;
@ProcessElement
public void processElement(ProcessContext c){
String p = c.element();
GcsUtilFactory factory = new GcsUtilFactory();
GcsUtil u = factory.create(c.getPipelineOptions());
byte[] buffer = new byte[100000000];
try{
SeekableByteChannel sek = u.open(GcsPath.fromUri(p));
InputStream is = Channels.newInputStream(sek);
BufferedInputStream bis = new BufferedInputStream(is);
ZipInputStream zis = new ZipInputStream(bis);
ZipEntry ze = zis.getNextEntry();
while(ze!=null){
LOG.info("Unzipping File {}",ze.getName());
WritableByteChannel wri = u.create(GcsPath.fromUri("gs://bucket_location/" + ze.getName()), getType(ze.getName()));
OutputStream os = Channels.newOutputStream(wri);
int len;
while((len=zis.read(buffer))>0){
os.write(buffer,0,len);
}
os.close();
filesUnzipped++;
ze=zis.getNextEntry();
}
zis.closeEntry();
zis.close();
}
catch(Exception e){
e.printStackTrace();
}
c.output(filesUnzipped);
System.out.println(filesUnzipped+"FilesUnzipped");
LOG.info("FilesUnzipped");
}
private String getType(String fName){
if(fName.endsWith(".zip")){
return "application/x-zip-compressed";
}
else {
return "text/plain";
}
}
}
如何處理這場景?
P.S. - .zip提取代碼與BigQueryIO.read()無關。我只是用它作爲黑客來讀取運行時值。如果您有任何其他建議,請讓我知道。
謝謝。
NestedValueProvider中的SerializableFunction總是返回空字符串「」 - 這是故意的嗎?而應用UnzipFn產生的集合也被忽略。 – jkff
另外它看起來像你試圖添加新的圖形步驟到你的NestedValueProvider的SerializableFunction內的管道。這是不可能的:管道首先被構建然後執行:你不能在運行時添加新的步驟。我很困惑你想做什麼,所以我不確定如何幫助你做到這一點 - 請澄清你想要做的事情。 – jkff
@jkff是的,這是故意的。所以基本上沒有UnzipFN產生的收集。 UnzipFN的工作只是解壓縮並提取其路徑將在運行時提供的.zip文件的內容。所以我的意思是要問 - 如何解壓縮GCS位置在運行時提供的文件? 如果除了我正在做的事情之外還有其他方式,請告訴我。 – rish0097