2015-10-30 32 views
1

我使用Scala和弗林克1.0快照上的數據組執行leftOuterJoin,我得到以下異常:阿帕奇弗林克fromCollection java.lang.IllegalStateException:讀數據塊

11:54:15,921 INFO org.apache.flink.runtime.taskmanager.Task - CHAIN DataSource (at com.mycompany.FlinkTest$.main(FlinkTest.scala:99) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at select('date as 'date,'dsCode as 'dsCode,'datatype as 'datatype,'quote as 'quote,'name as 'name)) (1/1) switched to FAILED with exception. 
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563) 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509) 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.IllegalStateException: unread block data 
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2431) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294) 
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:255) 
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282) 
    at org.apache.flink.runtime.operators.DataSourceTask.initInputFormat(DataSourceTask.java:241) 
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:81) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526) 

我使用簡單的Scala情況下類作爲數據集的類型:

case class RawValue(date: String, dsCode: String, datatype: String, quote: Double, name: String) 

我使用下面的方法來生成的情況下的類實例:

def getRawValuesFromZipFile(fileName: String) : Array[RawValue] 

我初始化環境,創建DataSet [RawValue]以下方式:

val env = ExecutionEnvironment.createLocalEnvironment(4) 
val rawValues = env.fromCollection(getRawValuesFromZipFile("filename.zip")) 
rawValues.print 

我懷疑序列化問題導致了錯誤,我使用Scala的2.10.5和Java 7個的系統庫編譯項目。我正在使用Eclipse,該項目是由示例項目生成腳本生成的。

上解決這個問題的任何幫助或提示將不勝感激:-) 謝謝, 丹尼爾

+0

您從zip文件中讀取的數據有多大? – aljoscha

+0

約22Mb壓縮,90Mb未壓縮和3065295 RawValue記錄。謝謝! –

+0

如果我在4個元素的硬編碼樣本Array [RawValue]上運行它,則連接運行良好。進一步調查。 –

回答

相關問題