2016-07-25 68 views
0

我有一個Spark Streaming作業輸出一些當前存儲在HDFS中的日誌,我想用logstash處理它們。不幸的是,雖然有一個插件可以在hdfs中寫入logstash,但它實際上是從hdfs中讀取的如何將Spark輸出鏈接到Logstash輸入

我有搜索解決方案來鏈接這兩個部分,但在python api的Spark流中,存儲某些東西的唯一方法是將它作爲文本文件寫入hdfs,所以我必須從hdfs讀取! 我無法在本地保存它們,因爲Spark在羣集上運行,並且我不想從每個節點獲取所有數據。

目前我運行一個非常髒的腳本,每2秒將hdfs目錄的內容複製到本地。但是這個解決方案顯然不令人滿意。

有沒有人知道一個軟件可以幫助我把Spark的輸出發送到Logstash?

在此先感謝!

編輯:我使用Python &星火1.6.0

+0

這些是由Log4j生成的日誌嗎? –

+0

不,這是由Spark處理的apache日誌,它基於機器學習算法增加了一些功能。 –

回答

0

這似乎是使用Kafka十全十美的工作。在Spark Streaming作業中,寫入Kafka,然後使用Logstash中的記錄。

stream.foreachRDD { rdd => 
    rdd.foreachPartition { partition => 
    val producer = createKafkaProducer() 
    partition.foreach { message => 
     val record = ... // convert message to record 
     producer.send(record) 
    } 
    producer.close() 
    } 
}