我目前正在寫了一個生產者和消費者的斯卡拉應用。生產者從外部獲取一些數據並在卡夫卡內部寫入數據。消費者從卡夫卡讀取並寫入Elasticsearch。星火流和ElasticSearch - 無法寫入所有條目
消費者基於Spark Streaming,每隔5秒從Kafka獲取新消息並將它們寫入ElasticSearch。問題是我無法寫入ES,因爲我收到了很多如下所示的錯誤:
錯誤] [2015-04-24 11:21:14,734] [org.apache.spark .TaskContextImpl]: TaskCompletionListener中的錯誤 org.elasticsearch.hadoop.EsHadoopException:無法寫入全部 條目[3/26560](也許ES被重載?)。在... org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:225) 〜[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:236) 〜[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3]在 org.elasticsearch.hadoop.rest .RestService $ PartitionWriter.close(RestService.java:125) 〜[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3]在 org.elasticsearch.spark.rdd.EsRDDWriter $$ anonfun $ $寫$ 1.適用MCV $ SP(EsRDDWriter.scala:33) 〜[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3]在 org.apache.spark.TaskContextImpl $$ anon $ 2.onTaskCompletion(TaskContextImpl.scala:57) 〜[spark-core _2.10-1.2.1.jar:1.2.1]在 org.apache.spark.TaskContextImpl $$ anonfun $ $ markTaskCompleted 1.適用(TaskContextImpl.scala:68) [火花core_2.10-1.2.1。 jar:1.2.1] at org.apache.spark.TaskContextImpl $$ anonfun $ markTaskCompleted $ 1.apply(TaskContextImpl.scala:66) [spark-core_2.10-1.2.1.jar:1.2.1] at scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59) [na:na] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) [na:na] at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) [火花core_2.10-1.2.1.jar:1.2.1]在 org.apache.spark.scheduler.Task.run(任務。 scala:58) [spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:200) [spark-core_2.10-1.2。 1.jar:1.2.1] java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor。的java:615) [NA:1.7.0_65]在java.lang.Thread.run(Thread.java:745)[NA:1.7.0_65]
認爲生產者寫入6個郵件每隔15秒,所以我真的不明白這種「過載」是如何發生的(我甚至清理了這個話題,並清理了所有的舊信息,我認爲它涉及到fset問題)。通過星火每5秒流執行的任務可以通過下面的代碼來概括:
val result = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map("wasp.raw" -> 1), StorageLevel.MEMORY_ONLY_SER_2)
val convertedResult = result.map(k => (k._1 ,AvroToJsonUtil.avroToJson(k._2)))
//TO-DO : Remove resource (yahoo/yahoo) hardcoded parameter
log.info(s"*** EXECUTING SPARK STREAMING TASK + ${java.lang.System.currentTimeMillis()}***")
convertedResult.foreachRDD(rdd => {
rdd.map(data => data._2).saveToEs("yahoo/yahoo", Map("es.input.json" -> "true"))
})
如果我嘗試打印,而不是發送到ES消息,一切都很好,我實際上只看到6條信息。爲什麼我不能寫入ES?
爲了完整起見,我使用這個庫來寫入ES:elasticsearch-spark_2.10和最新的beta版本。
我得到了同樣的錯誤嘗試從火花數據框中寫回一個大表ES(但不流)。我的默認設置是使用100個執行程序,所以基本上有100個併發連接到我們小型的ES羣集。爲我工作的解決方案是將數據幀重新分區到少量分區(在我的情況下爲10),以限制spark可能產生的最大併發連接數。 – patricksurry