2016-03-02 32 views
3

我想在我的Spark Streaming應用程序中設置StorageLevel.MEMORY_AND_DISK_SER,希望能阻止MetadataFetchFailedException如何爲KafkaUtils.createDirectStream設置StorageLevel.MEMORY_AND_DISK_SER?

我不知道我應該通過StorageLevel.MEMORY_AND_DISK,因爲它好像createDirectStream不允許傳遞該參數。

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet) 

完全錯誤:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460) 
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456) 
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) 
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456) 
    at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183) 
    at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47) 
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) 
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
+0

爲什麼您認爲設置存儲級別可以提供幫助?一個'MetadataFetchFailedException'說你的Spark執行者可能已經迷路或者一直處於沉重的GC狀態。 –

回答

-1

我們可以使用ReceiverLauncher.launch來從卡夫卡DStreams。請找到下面的示例代碼來設置來自kafka的流數據的存儲級別。

Properties props = new Properties(); props.put("zookeeper.hosts", "x.x.x.x"); props.put("zookeeper.port", "2181"); props.put("zookeeper.broker.path", "/brokers"); props.put("kafka.topic", "some-topic"); props.put("kafka.consumer.id", "12345");
props.put("zookeeper.consumer.connection", "x.x.x.x:2181"); props.put("zookeeper.consumer.path", "/consumer-path"); //Optional Properties props.put("consumer.forcefromstart", "true"); props.put("consumer.fetchsizebytes", "1048576"); props.put("consumer.fillfreqms", "250"); props.put("consumer.backpressure.enabled", "true");

`SparkConf _sparkConf = new SparkConf().setAppName("KafkaReceiver") 
     .set("spark.streaming.receiver.writeAheadLog.enable", "false");; 

JavaStreamingContext jsc = new JavaStreamingContext(_sparkConf, 
     new Duration(5000)); 

//Specify number of Receivers you need. 
//It should be less than or equal to number of Partitions of your topic 

int numberOfReceivers = 3; 

JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch(jsc, props, numberOfReceivers,StorageLevel.MEMORY_ONLY()); 

unionStreams 
     .foreachRDD(new Function2<JavaRDD<MessageAndMetadata>, Time, Void>() { 

      @Override 
      public Void call(JavaRDD<MessageAndMetadata> rdd, 
        Time time) throws Exception { 

       System.out.println("Number of records in this Batch is " + rdd.count()); 
       return null; 
      } 
     }); 

jsc.start(); 
jsc.awaitTermination();` 
1

使用persist您啓動StreamingContext(因此星火流應用程序)之前:

import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER 
dstream.persist(MEMORY_AND_DISK_SER) 

這不過可能會導致以下異常,所以你應該首先轉變DSTREAM 莫名其妙地,例如map,以獲得可序列化對象的DStream

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0 in stage 21.0 (TID 29) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = topic1, partition = 0, offset = 0, CreateTime = 1482512625196, checksum = 1892285426, serialized key size = -1, serialized value size = 11, key = null, value = hello world)) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1456) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1444) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1443) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1443) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1671) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1626) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1615) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2015) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2036) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1353) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
    at org.apache.spark.rdd.RDD.take(RDD.scala:1326) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at scala.util.Try$.apply(Try.scala:192) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:255) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:255) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:255) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:254) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745)