2015-10-27 25 views
1
  1. 我正在使用Flink交互式Shell執行WordCount。它適用於10MB的文件大小。但隨着一個100MB的文件殼拋出一個NullPointerException:

flink:Flink Shell引發NullPointerException

java.lang.NullPointerException 
    at org.apache.flink.api.common.accumulators.SerializedListAccumulator.deserializeList(SerializedListAccumulator.java:93) 
    at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:549) 
    at .<init>(<console>:22) 

at .<clinit>(<console>) 
at .<init>(<console>:7) 
at .<clinit>(<console>) 
at $print(<console>) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) 
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) 
at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) 
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) 
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) 
at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) 
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805) 
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717) 
at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) 
at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) 
at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) 
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ILoop.scala:601) 
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ILoop.scala:598) 
at scala.reflect.io.Streamable$Chars$class.applyReader(Streamable.scala:104) 
at scala.reflect.io.File.applyReader(File.scala:82) 
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ILoop.scala:598) 
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply(ILoop.scala:598) 
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply(ILoop.scala:598) 
at scala.tools.nsc.interpreter.ILoop.savingReplayStack(ILoop.scala:130) 
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1.apply(ILoop.scala:597) 
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1.apply(ILoop.scala:597) 
at scala.tools.nsc.interpreter.ILoop.savingReader(ILoop.scala:135) 
at scala.tools.nsc.interpreter.ILoop.interpretAllFrom(ILoop.scala:596) 
at scala.tools.nsc.interpreter.ILoop$$anonfun$loadCommand$1.apply(ILoop.scala:660) 
at scala.tools.nsc.interpreter.ILoop$$anonfun$loadCommand$1.apply(ILoop.scala:659) 
at scala.tools.nsc.interpreter.ILoop.withFile(ILoop.scala:653) 
at scala.tools.nsc.interpreter.ILoop.loadCommand(ILoop.scala:659) 
at scala.tools.nsc.interpreter.ILoop$$anonfun$standardCommands$7.apply(ILoop.scala:262) 
at scala.tools.nsc.interpreter.ILoop$$anonfun$standardCommands$7.apply(ILoop.scala:262) 
at scala.tools.nsc.interpreter.LoopCommands$LineCmd.apply(LoopCommands.scala:81) 
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:712) 
at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) 
at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) 
at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) 
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882) 
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) 
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) 
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837) 
at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:84) 
at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:54) 
at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala) 

我一個Linux系統(16MB RAM)上運行。那裏可能有什麼問題?

我的代碼(改編自https://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/scala_api_quickstart.html):

var filename = new String(<myFileName>) 
var text = env.readTextFile(filename) 
var counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1) 
var result = counts.collect() 
  • 我還注意到,該弗林克僅在一個核執行該程序。與env.getConfig.setParallelism設置並行後(4)和運行程序再次另一個異常:
  • 第1部分:

    org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. 
        at org.apache.flink.client.program.Client.run(Client.java:413) 
        at org.apache.flink.client.program.Client.run(Client.java:356) 
        at org.apache.flink.client.program.Client.run(Client.java:349) 
        at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89) 
        at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82) 
        at org.apache.flink.api.java.ScalaShellRemoteEnvironment.execute(ScalaShellRemoteEnvironment.java:68) 
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) 
        at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) 
        at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544) 
        at .<init>(<console>:28) 
        at .<clinit>(<console>) 
        at .<init>(<console>:7) 
        at .<clinit>(<console>) 
        at $print(<console>) 
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
        at java.lang.reflect.Method.invoke(Method.java:606) 
        at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) 
        at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) 
        at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) 
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) 
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) 
        at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) 
        at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805) 
        at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717) 
        at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) 
        at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) 
        at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) 
        at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ILoop.scala:601) 
        at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ILoop.scala:598) 
        at scala.reflect.io.Streamable$Chars$class.applyReader(Streamable.scala:104) 
        at scala.reflect.io.File.applyReader(File.scala:82) 
        at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ILoop.scala:598) 
        at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply(ILoop.scala:598) 
        at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply(ILoop.scala:598) 
        at scala.tools.nsc.interpreter.ILoop.savingReplayStack(ILoop.scala:130) 
        at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1.apply(ILoop.scala:597) 
        at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1.apply(ILoop.scala:597) 
        at scala.tools.nsc.interpreter.ILoop.savingReader(ILoop.scala:135) 
        at scala.tools.nsc.interpreter.ILoop.interpretAllFrom(ILoop.scala:596) 
        at scala.tools.nsc.interpreter.ILoop$$anonfun$loadCommand$1.apply(ILoop.scala:660) 
        at scala.tools.nsc.interpreter.ILoop$$anonfun$loadCommand$1.apply(ILoop.scala:659) 
        at scala.tools.nsc.interpreter.ILoop.withFile(ILoop.scala:653) 
        at scala.tools.nsc.interpreter.ILoop.loadCommand(ILoop.scala:659) 
        at scala.tools.nsc.interpreter.ILoop$$anonfun$standardCommands$7.apply(ILoop.scala:262) 
        at scala.tools.nsc.interpreter.ILoop$$anonfun$standardCommands$7.apply(ILoop.scala:262) 
        at scala.tools.nsc.interpreter.LoopCommands$LineCmd.apply(LoopCommands.scala:81) 
        at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:712) 
        at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) 
        at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) 
        at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) 
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882) 
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) 
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) 
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
        at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837) 
        at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:84) 
        at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:54) 
        at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala) 
    

    第2部分:

    Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) 
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) 
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) 
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) 
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) 
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) 
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) 
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) 
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
        at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) 
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
        at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 
        at akka.dispatch.Mailbox.run(Mailbox.scala:221) 
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
    Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #0 (CHAIN DataSource (at .<init>(<console>:26) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at .<init>(<console>:27)) -> Map (Map at .<init>(<console>:27)) -> Combine(SUM(1)) (2/4)) @ (unassigned) - [SCHEDULED] > with groupID <fc507fbb50fea681c726ca1d824c7577> in sharing group < SlotSharingGroup [fc507fbb50fea681c726ca1d824c7577, fb90f780c9d5a4a9dbf983cb06bec946, 52b8abe5a21ed808f0473a599d89f046] >. Resources available to scheduler: Number of instances=1, total number of slots=1, available slots=0 
        at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:250) 
        at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:126) 
        at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:271) 
        at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:430) 
        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:307) 
        at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:508) 
        at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:606) 
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) 
        ... 18 more 
    

    這是否意思是taskmanager.numberOfTaskSlots?在我的flink-conf.yaml中,這個鍵設置爲4.但是我怎樣才能在shell中設置它?

    回答

    3

    你問了兩個問題:

    1. 爲什麼print()不是大DataSet的工作?

    當您使用count()collect(),或在DataSetprint(),已在任務管理器分區的所有數據必須通過作業管理器被傳輸到客戶端。最好只使用這些方法進行測試或實現小型化DataSet。對於大數據,請使用Apache Flink提供的接收器之一,例如writeAsTextFile(..)。對於每個並行任務,都會創建一個輸出文件。

    如果您仍然想要將所有數據傳輸到客戶端,可以通過增加Akka的幀大小來實現。 Akka是Flink在引擎蓋下使用的消息傳遞庫。爲此,請在flink-conf.yaml中設置akka.framesize。默認值是10485760字節(10 MB)。 akka.framesize: 100mb將增加到100 MB。

    對於Apache Flink 1.0,一些提交者已經考慮刪除此限制,並且已經有一個使用另一種傳輸方式的大型物化數據集的請求。

    1. 什麼是任務槽,它們與並行性有什麼關係?

    Flink的默認配置爲每個任務管理器啓動一個任務槽。當你以本地模式啓動Scala shell時,它只啓動一個任務管理器。因此任務槽的總數是一個。當您將並行度更改爲N時,至少需要N任務插槽才能並行執行此操作。因此,要麼增加flink-conf.yaml中的任務插槽數量,要麼啓動其他任務管理器。如果你只是在本地運行,我會建議只增加任務槽的數量。欲瞭解更多信息,請參閱關於http://flink.apache.org的Flink文檔。

    編輯:如果您運行Scala-Shell,則只有一個任務管理器啓動嵌入式Flink羣集。您可以使用./bin/start-local.sh啓動本地羣集,然後使用Scala shell的主機和端口參數(host:localhost,port:6123)連接到本地羣集。

    +0

    好吧,並行執行使用start-local.sh和setParallelism。謝謝你的建議。 但現在我有一個writeAsText()方法的問題。我認爲這個方法會創建一個包含我的應用程序結果的文件。但那並不奏效。我用 counts.writeAsText(「file:/// /.txt」) 這並沒有創建一個文件。我也試着先手動創建一個文件並使用它的文件名,但它也沒有工作。我使用的方法錯了嗎? – lary

    +0

    而且我也有一個普遍的問題。我想比較flinks的性能和相同算法的單線程實現。這就是爲什麼我使用collect()。我想實現結果-DataSet,因爲我認爲這適用於此比較。你認爲writeAsText()方法也可以在那裏工作嗎?該方法究竟做了什麼? – lary

    +0

    也許你在寫入文件後忘記了使用env.execute()?執行的原因是,默認情況下,所有Flink接收器都是惰性的,直到執行開始才執行。 – mxm

    相關問題