2015-04-24 69 views
3

我目前正在寫了一個生產者和消費者的斯卡拉應用。生產者從外部獲取一些數據並在卡夫卡內部寫入數據。消費者從卡夫卡讀取並寫入Elasticsearch。星火流和ElasticSearch - 無法寫入所有條目

消費者基於Spark Streaming,每隔5秒從Kafka獲取新消息並將它們寫入ElasticSearch。問題是我無法寫入ES,因爲我收到了很多如下所示的錯誤:

錯誤] [2015-04-24 11:21:14,734] [org.apache.spark .TaskContextImpl]: TaskCompletionListener中的錯誤 org.elasticsearch.hadoop.EsHadoopException:無法寫入全部 條目[3/26560](也許ES被重載?)。在... org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:225) 〜[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:236) 〜[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3]在 org.elasticsearch.hadoop.rest .RestService $ PartitionWriter.close(RestService.java:125) 〜[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3]在 org.elasticsearch.spark.rdd.EsRDDWriter $$ anonfun $ $寫$ 1.適用MCV $ SP(EsRDDWriter.scala:33) 〜[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3]在 org.apache.spark.TaskContextImpl $$ anon $ 2.onTaskCompletion(TaskContextImpl.scala:57) 〜[spark-core _2.10-1.2.1.jar:1.2.1]在 org.apache.spark.TaskContextImpl $$ anonfun $ $ markTaskCompleted 1.適用(TaskContextImpl.scala:68) [火花core_2.10-1.2.1。 jar:1.2.1] at org.apache.spark.TaskContextImpl $$ anonfun $ markTaskCompleted $ 1.apply(TaskContextImpl.scala:66) [spark-core_2.10-1.2.1.jar:1.2.1] at scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59) [na:na] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) [na:na] at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) [火花core_2.10-1.2.1.jar:1.2.1]在 org.apache.spark.scheduler.Task.run(任務。 scala:58) [spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:200) [spark-core_2.10-1.2。 1.jar:1.2.1] java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor。的java:615) [NA:1.7.0_65]在java.lang.Thread.run(Thread.java:745)[NA:1.7.0_65]

認爲生產者寫入6個郵件每隔15秒,所以我真的不明白這種「過載」是如何發生的(我甚至清理了這個話題,並清理了所有的舊信息,我認爲它涉及到fset問題)。通過星火每5秒流執行的任務可以通過下面的代碼來概括:

val result = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map("wasp.raw" -> 1), StorageLevel.MEMORY_ONLY_SER_2) 
    val convertedResult = result.map(k => (k._1 ,AvroToJsonUtil.avroToJson(k._2))) 



    //TO-DO : Remove resource (yahoo/yahoo) hardcoded parameter 
    log.info(s"*** EXECUTING SPARK STREAMING TASK + ${java.lang.System.currentTimeMillis()}***") 


    convertedResult.foreachRDD(rdd => { 
     rdd.map(data => data._2).saveToEs("yahoo/yahoo", Map("es.input.json" -> "true")) 

    }) 

如果我嘗試打印,而不是發送到ES消息,一切都很好,我實際上只看到6條信息。爲什麼我不能寫入ES?

爲了完整起見,我使用這個庫來寫入ES:elasticsearch-spark_2.10和最新的beta版本。

+0

我得到了同樣的錯誤嘗試從火花數據框中寫回一個大表ES(但不流)。我的默認設置是使用100個執行程序,所以基本上有100個併發連接到我們小型的ES羣集。爲我工作的解決方案是將數據幀重新分區到少量分區(在我的情況下爲10),以限制spark可能產生的最大併發連接數。 – patricksurry

回答

3

經過多次重試之後,我發現了一種寫入ElasticSearch而不會出現任何錯誤的方法。基本上通過參數"es.batch.size.entries" -> "1"到saveToES方法解決了這個問題。我不明白爲什麼使用默認或其他批處理大小會導致上述錯誤,因爲如果我試圖寫入比允許的最大批處理大小更多的東西,而不是更少,則會出現錯誤消息。

此外,我已經注意到,實際上我正在寫給ES,但不是所有的消息,我每批次丟失1到3條消息。

1

其中一種可能性是羣集/分片狀態爲RED。請解決這個問題,這可能是由於未分配的副本。狀態變爲綠色後,API調用成功。

2

當我在Spark上將數據幀推送到ES時,我有相同的錯誤消息。即使使用"es.batch.size.entries" -> "1"配置,我也有同樣的錯誤。 一旦我在ES中增加了線程池,我可以找出這個問題。

例如,

散裝池

threadpool.bulk.type: fixed 
threadpool.bulk.size: 600 
threadpool.bulk.queue_size: 30000