2017-06-03 91 views
0

我試圖在Flink(Scala)中使用Zeppelin開發K均值模型。 這是我簡單的代碼部分:Flink:ERROR解析數值格式

//Reading data 
val mapped : DataSet[Vector] = data.map {x => DenseVector (x._1,x._2) } 

//Create algorithm 
val knn = KNN() 
    .setK(3) 
    .setBlocks(10) 
    .setDistanceMetric(SquaredEuclideanDistanceMetric()) 
    .setUseQuadTree(false) 
    .setSizeHint(CrossHint.SECOND_IS_SMALL) 
... 
//Just to learn I use the same data predicting the model 
val result = knn.predict(mapped).collect() 

當我打印的資料或使用預測方法,我得到這個錯誤

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. 
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:409) 
    at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95) 
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:382) 
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:369) 
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:344) 
    at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211) 
    at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188) 
    at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172) 
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896) 
    at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:637) 
    at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) 
    ... 36 elided 
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:822) 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768) 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: org.apache.flink.api.common.io.ParseException: Line could not be parsed: '-6.59 -44.68' 
ParserError NUMERIC_VALUE_FORMAT_ERROR 
Expect field types: class java.lang.Double, class java.lang.Double 
in file: /home/borja/flink/kmeans/points 
    at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord(GenericCsvInputFormat.java:407) 
    at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:110) 
    at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:470) 
    at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78) 
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) 
    at java.lang.Thread.run(Thread.java:748) 

我不知道這是否是我的錯裝數據或它與that之類的東西有關。

感謝您的幫助! :)

回答

1

您沒有向我們展示您用於讀取和分析數據的代碼,這是錯誤發生的地方。但考慮到錯誤消息,我會冒險猜測您正在使用readCSVFile和由空格或製表符分隔的數據,並且未指定fieldDelimiter(默認爲逗號)。如果是這種情況,請參閱docs瞭解如何配置CSV解析器。

+0

**非常感謝您**我使用LibreOffice打開了文件,並將fieldDelimiter替換爲*空格*而不是逗號。我永遠不會想到它,所以再次感謝! :) – Borja