1

我有一個apache-beam應用程序,它使用數據流運行器在本地運行直接運行程序和谷歌雲。它在本地工作,但失敗的谷歌數據流亞軍。如何在Apache Beam中序列化運行時創建的類

下面是錯誤的痕跡:

(9938ce94c0752c7):了java.lang.RuntimeException:com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException:無法反序列化序列化DoFnInfo com.google.cloud.dataflow.worker.MapTaskExecutorFactory $ 3.typedApply(MapTaskExecutorFactory.java:283) com.google.cloud.dataflow.worker.MapTaskExecutorFactory $ 3.typedApply (MapTaskExecutorFactory.java:253) 在com.google.cloud.dataflow.worker.graph.Networks $ TypeSafeNodeFunction.apply(Networks.java:55) 在com.google.cloud.dataflow.worker.graph.Networks $ TypeSafeNodeFunction.apply(Networks.java:43) 在com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78) 在com.google.cloud.dataflow.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:142) 在com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:271) 在com.google.cloud。 dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness $ WorkerThread.doWork(DataflowBatchWorkerHarness.java:135) at com.google.cloud.dataflow.worker。 DataflowBatchWorkerHarness $ WorkerThread.call(DataflowBatchWorkerHarness.java:115) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness $ W orgThread.call(DataflowBatchWorkerHarness.java:102) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java。 util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
引起:com.google.cloud.dataflow.worker.repackaged.com .google.common.util.concurrent.UncheckedExecutionException:java.lang.IllegalArgumentException異常:無法反序列化序列化DoFnInfo 在com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache $ Segment.get( LocalCache.java:2214) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053) 在com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache $ LocalManualCache.get(LocalCache.java:4899) 在com.google.cloud.dataflow.worker.UserParDoFnFactory.create( UserParDoFnFactory.java:95) at com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:66) at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:360) 在com.google.cloud.dataflow.worker.MapTaskExecutorFactory $ 3.typedApply(MapTaskExecutorFactory.java:271) ...... 14多個
造成的:java.lang.IllegalArgumentException異常:無法反序列化序列化DoFnInfo 在org.apache .beam.sdk.util.SerializableUtils.deserializeFromByteArray(Seriali zableUtils.java:75) at com.google.cloud.dataflow.worker.UserParDoFnFactory $ UserDoFnExtractor.getDoFnInfo(UserParDoFnFactory.java:64) at com.google.cloud.dataflow.worker.UserParDoFnFactory $ 1.call(UserParDoFnFactory.java :100) 在com.google.cloud.dataflow.worker.UserParDoFnFactory $ 1.call(UserParDoFnFactory.java:97) 在com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache $ LocalManualCache $ 1.load(LocalCache.java:4904) ,位於com.google.cloud.dataflow.worker.repackaged.com.google.common.cache。LocalCache $ LoadingValueReference.loadFuture(LocalCache.java:3628) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache $ Segment.loadSync(LocalCache.java:2336) at com。 google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache $ Segment.lockedGetOrLoad(LocalCache.java:2295) ,位於com.google.cloud.dataflow.worker.repackaged.com.google.common。 cache.LocalCache $ Segment.get(LocalCache.java:2208) ...... 20多個
引起的:在java.net.URLClassLoader.findClass(URLClassLoader.java:381) 在Header_H :拋出java.lang.ClassNotFoundException java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher $ AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.io ObjectInputStream.java:628) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream .readClass(ObjectInputStream.java:1486) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1336) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:1942) at java.io.Object InputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream。在java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)處的readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject( ObjectInputStream.java:373) 在org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:72) ...... 28多個

它指向

「...無法反序列化序列化DoFnInfo」

「...拋出java.lang.ClassNotFoundException:Header_H」

我懷疑這與我使用bytebuddy代碼創建類Header_H有關。我使用bytebuddy在現有源代碼基礎上構建了基於some.class的子類,並在運行時從配置文件獲得了額外的用戶輸入,即Header_H僅在運行時變爲可用。

我bytebuddy代碼有點像這樣:

builder = new ByteBuddy().subclass(some.class).name("Header_H").modifiers(PUBLIC); 
     .defineField("serialVersionUID", long.class, STATIC, PRIVATE, FINAL).value(37L) 
     .implement(Serializable.class); 

Class <?> clazz = builder.make().load(getClass().getClassLoader()).getLoaded(); 

然後clazz(在這種情況下Header_H)將被傳遞到數據流的管道。當我在臨時谷歌雲階段位置檢查了jar文件的內容時,我看到some.class而不是Header_H.class,這可能會導致錯誤「ClassNotFoundException」。

因此,如果我的推理是正確的,那麼我如何讓Beam將運行時創建的類放入jar文件中發送到數據流運行器,因爲我的類創建中有implement(Serializable.class)

回答

1

字節的好友可以通過在JAR文件注入一類:

DynamicType.Unloaded<?> type = builder.make(); 
builder.inject(someJar); 

這將改變現有的jar文件以包括動態生成的類。這樣,您可以更改已存在於系統類路徑中的現有jar。

此API還允許您創建一個新的jar,並且您可以使用Instrumentation API(通過Java代理),它允許您將此類作爲新的jar文件追加到類路徑中。爲避免附加代理,您還可以嘗試將字節好友代理項目用於動態附件。

這會工作方式:

File someFolder = ... 
File jar = builder.saveIn(someFolder); 
ByteBuddyAgent.install().appendToSystemClassLoaderSearch(new JarFile(jar)); 

如果動態連接是不允許的谷歌雲,你也許可以通過在命令行定期連接到解決這個問題。

+0

在上面的代碼中,你的意思是'type.inject(somejar)'和'type.saveIn(somefolder)'? – bignano

1

Dataflow運行器不控制JAR文件的內容 - 它僅解析程序的類路徑,從磁盤讀取JAR並將它們複製到GCS上管道的暫存目錄。現在,梁沒有提供一種方法來發布類路徑中未包含在JAR中的類。

您可能需要在管道規範中找到只使用這些JAR中的類的方法,但是當然您仍然可以在DoFn或其他本地運行的代碼中使用ByteBuddy。但請注意,任何在工作人員之間交付的內容(例如PCollection的內容)仍然必須是可序列化的(可在一個工作人員上進行序列化,並在另一個工作人員上進行反序列化)或者擁有編碼器。

或者,可能有一種方法可以讓ByteBuddy生成JAR並將其動態添加到程序的類路徑中。這可能會起作用,但這是一個ByteBuddy特定的問題,我對ByteBuddy不太瞭解如何去做。

+0

謝謝@jkff。讓我們來看看是否有人能夠幫助澄清Bytebuddy方面。 – bignano