2017-06-13 33 views
3

我正在使用以下代碼從Kafka主題讀取並處理數據。Kafkaconsumer不適用於多線程訪問

JavaDStream<Row> transformedMessages = messages.flatMap(record -> processData(record)) 
       .transform(new Function<JavaRDD<Row>, JavaRDD<Row>>() { 
        //JavaRDD<Row> records = ss.emptyDataFrame().toJavaRDD(); 
        StructType schema = DataTypes.createStructType(fields); 

        public JavaRDD<Row> call(JavaRDD<Row> rdd) throws Exception { 
         records = rdd.union(records); 
         return rdd; 
        } 
     }); 

     transformedMessages.foreachRDD(record -> { 
      //System.out.println("Aman" +record.count()); 
      StructType schema = DataTypes.createStructType(fields); 

      Dataset ds = ss.createDataFrame(records, schema); 
      ds.createOrReplaceTempView("trades"); 
      System.out.println(ds.count()); 
      ds.show(); 

     }); 

運行代碼時,我得到異常下面我:

Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access 
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1624) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1197) 
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) 
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) 
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228) 
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 

我只有一個DSTREAM事實上,我不知道爲什麼我收到此異常。 我正在閱讀卡夫卡主題中的3個分區。我假設「createDirectStream」將創建3個消費者來讀取數據。

下面是對KafkaConsumer代碼,獲取方法:

private void acquire() { 
     this.ensureNotClosed(); 
     long threadId = Thread.currentThread().getId(); 
     if(threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) { 
      throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); 
     } else { 
      this.refcount.incrementAndGet(); 
     } 
    } 
+0

這很奇怪。你正在運行本地還是集羣?如果集羣,什麼樣?你可以添加代碼在哪裏創建流和'processData'的實現? – maasg

+0

看來這是一個討厭的錯誤:https://issues.apache.org/jira/browse/SPARK-19185 – maasg

+0

我在本地運行,但卡夫卡話題是集中的。 「processData」方法只是反序列化我們在流中獲取的消息。根據我的理解,一位消費者從一個卡夫卡分區中讀取數據。在這種情況下,多個消費者正在訪問相同的kafka分區,或者消費者正在洗牌。 –

回答

0

星火2.2.0具有不使用緩存的解決方法。 只需使用spark.streaming.kafka.consumer.cache.enabled至false即可。 看一下pull要求