1

我試圖將Kafka流轉換爲RDD並將這些RDD插入到Elasticsearch數據庫中。這是我的代碼:Pyspark使用saveAsNewAPIHadoopFile將DStream數據寫入Elasticsearch

conf = SparkConf().setAppName("ola") 
sc = SparkContext(conf=conf) 
es_write_conf = { 
    "es.nodes": "localhost", 
    "es.port": "9200", 
    "es.resource": "pipe/word" 
} 

ssc = StreamingContext(sc, 2) 
brokers, topic = sys.argv[1:] 
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) 
lines = kvs.map(lambda x: x[1]) 
value_counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b) 

value_counts.transform(lambda rdd: rdd.map(f)) 
value_counts.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_write_conf) 

ssc.start() 
ssc.awaitTermination() 

saveAsNewAPIHadoopFile函數應該將這些RDD寫入ES。但是我得到這個錯誤:

value_counts.saveAsNewAPIHadoopFile(
    AttributeError: 'TransformedDStream' object has no attribute 'saveAsNewAPIHadoopFile' 

轉換函數應該能夠將流轉換爲Spark數據幀。我如何將這些RDD寫入Elasticsearch?謝謝!

回答

0

您可以使用foreachRDD

value_counts.foreachRDD(lambda rdd: rdd.saveAsNewAPIHadoopFile(...)) 
+0

謝謝您的建議!但是,現在我得到了一個巨大的錯誤:6/12/29 19:23:06警告EsOutputFormat:爲Reducer啓用了投機執行 - 考慮禁用它以防止數據損壞 16/12/29 19:23:06 WARN EsOutputFormat:Can not確定任務ID 16/12/29 19:23:07 ERROR執行程序:階段83.0(TID 55)的任務0.0中的異常org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest:找到不可恢復的錯誤[127.0.0.1:9200]返回錯誤請求(400) - 解析失敗;壓縮器檢測只能在xcontent字節或壓縮的xcontent字節上調用;出局.. –

+0

我不熟悉ES,所以我在這裏不會有任何幫助。當您手動保存單個RDD時它工作嗎? – user7337271