2015-06-15 62 views
5

我的spark-streaming代碼可以在eclipse IDE上無縫工作。 但是,當我在本地Spark集羣上運行它時,它會提供org.apache.spark.util.TaskCompletionListenerException。

另外在火花提交「客戶端模式」代碼運行良好,直到我開始我的卡夫卡生產者,但當我開始生產者它會給出以下錯誤。

我開始命令SH SPARK_HOME本地集羣/ sbin目錄/ start-all.sh

,並調用這個腳本火花提交。

#!/bin/sh 

SP_SUBMIT=/home/user/spark/bin/spark-submit 
DEP_MODE=client 


$SP_SUBMIT \ 
--deploy-mode $DEP_MODE \ 
--class com.alind.sparkStream.Test \ 
--master spark://clstr:7077 \ 
--name alind\ 
/home/user/jar/com.alind-0.0.1-SNAPSHOT.jar \ 

當火花流開始接收消息我收到此錯誤

2015-06-29 16:13:56 ERROR JobScheduler:96 - Error running job streaming job 1435574590600 ms.3 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 306.0 failed 1 times, most recent failure: Lost task 0.0 in stage 306.0 (TID 164, localhost): org.apache.spark.util.TaskCompletionListenerException 
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) 
    at org.apache.spark.scheduler.Task.run(Task.scala:58) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
2015-06-29 16:13:56 WARN JobProgressListener:71 - Task start for unknown stage 307 
2015-06-29 16:13:56 WARN JobProgressListener:71 - Task start for unknown stage 308 
2015-06-29 16:13:56 WARN JobProgressListener:71 - Task start for unknown stage 309 
2015-06-29 16:13:56 INFO SparkContext:59 - Starting job: foreach at Test.java:428 
2015-06-29 16:13:56 INFO MapOutputTrackerMaster:59 - Size of output statuses for shuffle 34 is 84 bytes 
2015-06-29 16:13:56 INFO MapOutputTrackerMaster:59 - Size of output statuses for shuffle 35 is 84 bytes 
2015-06-29 16:13:56 INFO DAGScheduler:59 - Got job 94 (foreach at Test.java:428) with 2 output partitions (allowLocal=false) 
2015-06-29 16:13:56 INFO DAGScheduler:59 - Final stage: Stage 327(foreach at Test.java:428) 
2015-06-29 16:13:56 INFO DAGScheduler:59 - Parents of final stage: List(Stage 320, Stage 317, Stage 324, Stage 321, Stage 318, Stage 325, Stage 322, Stage 326, Stage 323, Stage 319) 
2015-06-29 16:13:56 INFO ShuffledDStream:59 - Slicing from 1435574619500 ms to 1435574620400 ms (aligned to 1435574619500 ms and 1435574620400 ms) 
2015-06-29 16:13:56 INFO DAGScheduler:59 - Missing parents: List(Stage 320, Stage 317, Stage 318, Stage 319) 
2015-06-29 16:13:56 INFO DAGScheduler:59 - Submitting Stage 317 (MappedRDD[234] at mapToPair at Test.java:157), which has no missing parents 
2015-06-29 16:13:56 INFO MemoryStore:59 - ensureFreeSpace(4024) called with curMem=386851, maxMem=278302556 
2015-06-29 16:13:56 INFO MemoryStore:59 - Block broadcast_129 stored as values in memory (estimated size 3.9 KB, free 265.0 MB) 
2015-06-29 16:13:56 INFO MemoryStore:59 - ensureFreeSpace(2230) called with curMem=390875, maxMem=278302556 
2015-06-29 16:13:56 INFO MemoryStore:59 - Block broadcast_129_piece0 stored as bytes in memory (estimated size 2.2 KB, free 265.0 MB) 
2015-06-29 16:13:56 INFO BlockManagerInfo:59 - Added broadcast_129_piece0 in memory on localhost:42836 (size: 2.2 KB, free: 265.3 MB) 
2015-06-29 16:13:56 INFO BlockManagerMaster:59 - Updated info of block broadcast_129_piece0 
2015-06-29 16:13:56 INFO SparkContext:59 - Created broadcast 129 from getCallSite at DStream.scala:294 
2015-06-29 16:13:56 INFO DAGScheduler:59 - Submitting 1 missing tasks from Stage 317 (MappedRDD[234] at mapToPair at Test.java:157) 
2015-06-29 16:13:56 INFO TaskSchedulerImpl:59 - Adding task set 317.0 with 1 tasks 
2015-06-29 16:13:56 INFO TaskSetManager:59 - Starting task 0.0 in stage 317.0 (TID 168, localhost, NODE_LOCAL, 7642 bytes) 
2015-06-29 16:13:56 INFO Executor:59 - Running task 0.0 in stage 317.0 (TID 168) 
2015-06-29 16:13:56 INFO KafkaRDD:103 - Computing topic test, partition 0 offsets 252661 -> 253192 
2015-06-29 16:13:56 INFO VerifiableProperties:68 - Verifying properties 
2015-06-29 16:13:56 INFO VerifiableProperties:68 - Property group.id is overridden to 
2015-06-29 16:13:56 INFO VerifiableProperties:68 - Property zookeeper.connect is overridden to 
2015-06-29 16:13:56 ERROR TaskContextImpl:96 - Error in TaskCompletionListener 
java.lang.NullPointerException 
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158) 
    at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63) 
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) 
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) 
    at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49) 
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68) 
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:58) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
2015-06-29 16:13:56 ERROR Executor:96 - Exception in task 0.0 in stage 317.0 (TID 168) 
org.apache.spark.util.TaskCompletionListenerException 
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) 
    at org.apache.spark.scheduler.Task.run(Task.scala:58) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
2015-06-29 16:13:56 WARN TaskSetManager:71 - Lost task 0.0 in stage 317.0 (TID 168, localhost): org.apache.spark.util.TaskCompletionListenerException 
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) 
    at org.apache.spark.scheduler.Task.run(Task.scala:58) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

My Pom.xml看起來像這樣。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <groupId>alinds</groupId> 
    <artifactId>alind</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 

    <build> 
     <sourceDirectory>src</sourceDirectory> 
     <plugins> 

      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-shade-plugin</artifactId> 
       <version>2.1</version> 
       <executions> 
        <execution> 
         <phase>package</phase> 
         <goals> 
          <goal>shade</goal> 
         </goals> 
         <configuration> 
          <filters> 
           <filter> 
            <artifact>*:*</artifact> 
            <excludes> 
             <exclude>META-INF/*.SF</exclude> 
             <exclude>META-INF/*.DSA</exclude> 
             <exclude>META-INF/*.RSA</exclude> 
            </excludes> 
           </filter> 
          </filters> 
          <transformers> 
           <transformer 
            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> 
            <resource>reference.conf</resource> 
           </transformer> 
           <transformer 
            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> 
            <mainClass> 
             com.alind.sparkStream.Test 
            </mainClass> 
           </transformer> 
          </transformers> 
         </configuration> 
        </execution> 
       </executions> 
      </plugin> 

      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <version>2.3.2</version> 
       <configuration> 
        <source>${jdk.version}</source> 
        <target>${jdk.version}</target> 
       </configuration> 
      </plugin> 

     </plugins> 
    </build> 
    <dependencies> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.10</artifactId> 
      <version>1.3.1</version> 
      <scope>provided</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming_2.10</artifactId> 
      <version>1.3.1</version> 
      <scope>provided</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-kafka_2.10</artifactId> 
      <version>1.3.1</version> 

     </dependency> 

     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka_2.10</artifactId> 
      <version>0.8.2.1</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>0.8.2.0</version> 
      <scope>provided</scope> 
     </dependency> 

     <dependency> 
      <groupId>MyOtherProject</groupId> 
      <version>1.0</version> 

     </dependency> 

    </dependencies> 
    <repositories> 
     <repository> 
      <id>Spark repository</id> 
      <url>http://www.sparkjava.com/nexus/content/repositories/spark/</url> 
     </repository> 
    </repositories> 
</project> 

和火花驅動程序看起來像這樣...

public class Test { 



static Logger log = Logger.getLogger(Test.class.getName()); 



    public static void main(String[] args) { 

     System.setProperty("spark.serializer", 
       "org.apache.spark.serializer.KryoSerializer"); 


     SparkConf sparkConf = new SparkConf(); 

     sparkConf.setMaster("spark://clstr:7077"); 
     // when i run this code from eclipse i change setMaster value to ("local[2]") 
     sparkConf.setAppName("alind"); 
     JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); 
     JavaStreamingContext javaStreamingContext = new JavaStreamingContext(
       javaSparkContext, new Duration(100)); 

     Set<String> topics = new HashSet<String>(); 
     topics.add("test"); 

     Map<String, String> kafkaParams = new HashMap<String, String>(); 
     kafkaParams.put("metadata.broker.list", "10.20.3.14:9092"); 
    // Tested this metadata.broker.list with localhost:9092 as well, its not working on cluster with any of these. 
     JavaPairInputDStream<String, String> stream = KafkaUtils 
       .createDirectStream(javaStreamingContext, String.class, 
         String.class, StringDecoder.class, StringDecoder.class, 
         kafkaParams, topics); 
     stream.print(); 


     javaStreamingContext.start(); 
     javaStreamingContext.awaitTermination(); 

    } 
} 

如果你能給我任何想法什麼是錯與本地集羣,我會很感激。 看起來像卡夫卡端的東西是錯的。

+0

我有類似的錯誤。你能找到這個原因嗎? – Saket

+0

嘿薩基特,這還不是一個懸而未決的問題......我將嘗試再次探討這個週末。 –

回答

2

我有同樣的問題,原因是我的一個解碼器有不正確的構造函數。在這方面,這個例外是非常具有誤導性的。

不正確的類

class ReadingCreatedDecoder() 
    extends Decoder[Message[ReadingCreated]] 
    with ReadingCreatedAvroSupport 

正確的版本(見道具:VerifiableProperties)

class ReadingCreatedDecoder(props: VerifiableProperties = null) 
    extends Decoder[Message[ReadingCreated]] 
    with ReadingCreatedAvroSupport 

PS:我使用Scala和不是Java。

相關問題