2016-10-13 85 views
1

我正在嘗試從mongodb-hadoop連接器映射數據到spark應用程序中。在這之前我沒有其他錯誤,所以我假設與mongodb的連接是成功的。即時通訊使用下面的代碼映射:Spark Mongo Hadoop連接器沒有映射數據

JavaRDD<AppLog> logs = documents.map(

    new Function<Tuple2<Object, BSONObject>, AppLog>() { 

     public AppLog call(final Tuple2<Object, BSONObject> tuple) { 
      AppLog log = new AppLog(); 
      BSONObject header = 
      (BSONObject) tuple._2().get("headers"); 

      log.setTarget((String) header.get("target")); 
      log.setAction((String) header.get("action")); 

      return log; 
     } 
    } 
); 

代碼失敗,此:

16/10/12 19:42:31 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
java.lang.NullPointerException 
    at com.hbfinance.DataframeExample$1.call(DataframeExample.java:64) 
    at com.hbfinance.DataframeExample$1.call(DataframeExample.java:57) 
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1027) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:372) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) 
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
16/10/12 19:42:31 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NullPointerException 
    at com.hbfinance.DataframeExample$1.call(DataframeExample.java:64) 
    at com.hbfinance.DataframeExample$1.call(DataframeExample.java:57) 
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1027) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:372) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) 
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

16/10/12 19:42:31 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job 
16/10/12 19:42:31 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/10/12 19:42:31 INFO TaskSchedulerImpl: Cancelling stage 0 
16/10/12 19:42:31 INFO DAGScheduler: ShuffleMapStage 0 (show at DataframeExample.java:83) failed in 1.704 s 
16/10/12 19:42:31 INFO DAGScheduler: Job 0 failed: show at DataframeExample.java:83, took 2.370636 s 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NullPointerException 
    at com.hbfinance.DataframeExample$1.call(DataframeExample.java:64) 
    at com.hbfinance.DataframeExample$1.call(DataframeExample.java:57) 
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1027) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:372) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) 
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215) 
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207) 
    at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386) 
    at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904) 
    at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385) 
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315) 
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378) 
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178) 
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402) 
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363) 
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371) 
    at com.hbfinance.DataframeExample.run(DataframeExample.java:83) 
    at com.hbfinance.DataframeExample.main(DataframeExample.java:89) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.NullPointerException 
    at com.hbfinance.DataframeExample$1.call(DataframeExample.java:64) 
    at com.hbfinance.DataframeExample$1.call(DataframeExample.java:57) 
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1027) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:372) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) 
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
16/10/12 19:42:31 INFO SparkContext: Invoking stop() from shutdown hook 
16/10/12 19:42:31 INFO SparkUI: Stopped Spark web UI at http://178.62.18.22:4040 
16/10/12 19:42:31 INFO DAGScheduler: Stopping DAGScheduler 
16/10/12 19:42:31 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
16/10/12 19:42:31 INFO MemoryStore: MemoryStore cleared 
16/10/12 19:42:31 INFO BlockManager: BlockManager stopped 
16/10/12 19:42:31 INFO BlockManagerMaster: BlockManagerMaster stopped 
16/10/12 19:42:31 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
16/10/12 19:42:31 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 
16/10/12 19:42:31 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 
16/10/12 19:42:31 INFO SparkContext: Successfully stopped SparkContext 
16/10/12 19:42:31 INFO ShutdownHookManager: Shutdown hook called 
16/10/12 19:42:31 INFO ShutdownHookManager: Deleting directory /tmp/spark-c9ba1976-0c79-4a43-b261-1e7c98139a6e 

回答

1

一個變量的,在你的映射器類中使用,具有空值。

什麼是null?

  • tuple參數
  • 元組
  • 或元組的第二個元素的第二元素沒有headers對象內部

我們沒有您的測試數據,你必須檢查由你自己可以爲null。這不是Spark的問題,而是你的內部代碼。

只是調試你的應用程序或做if-else塊,你會發現什麼值爲空。如果這是一個tuple參數的問題,你可以做documents.filter (x -> x != null).map (...)來過濾來自RDD的所有可爲空的值

+0

謝謝先生你是一個傳奇人物。我認爲標題是從例子中取出.get(「標題」),它的工作 –

相關問題