1
我試圖將Kafka流轉換爲RDD並將這些RDD插入到Elasticsearch數據庫中。這是我的代碼:Pyspark使用saveAsNewAPIHadoopFile將DStream數據寫入Elasticsearch
conf = SparkConf().setAppName("ola")
sc = SparkContext(conf=conf)
es_write_conf = {
"es.nodes": "localhost",
"es.port": "9200",
"es.resource": "pipe/word"
}
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
value_counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
value_counts.transform(lambda rdd: rdd.map(f))
value_counts.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
ssc.start()
ssc.awaitTermination()
saveAsNewAPIHadoopFile函數應該將這些RDD寫入ES。但是我得到這個錯誤:
value_counts.saveAsNewAPIHadoopFile(
AttributeError: 'TransformedDStream' object has no attribute 'saveAsNewAPIHadoopFile'
轉換函數應該能夠將流轉換爲Spark數據幀。我如何將這些RDD寫入Elasticsearch?謝謝!
謝謝您的建議!但是,現在我得到了一個巨大的錯誤:6/12/29 19:23:06警告EsOutputFormat:爲Reducer啓用了投機執行 - 考慮禁用它以防止數據損壞 16/12/29 19:23:06 WARN EsOutputFormat:Can not確定任務ID 16/12/29 19:23:07 ERROR執行程序:階段83.0(TID 55)的任務0.0中的異常org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest:找到不可恢復的錯誤[127.0.0.1:9200]返回錯誤請求(400) - 解析失敗;壓縮器檢測只能在xcontent字節或壓縮的xcontent字節上調用;出局.. –
我不熟悉ES,所以我在這裏不會有任何幫助。當您手動保存單個RDD時它工作嗎? – user7337271