2017-03-01 74 views
0

在Flume代理中,我收集來自Kafka主題的元素,並且需要將它們插入到ES中。不過,我需要在接收器中執行以前的消解過程,所以我需要編寫一個定製接收器將代理通道中的數據傳遞給java消解模塊(我已經寫過)。如何在Flume 1.7中編寫自定義ES接收器

任何人都可以與我分享一個自定義接收器的模板,並可以用作參考? Flumes官方網站並沒有多說這個話題: 啓動Flume代理時,自定義接收器的類及其依賴關係必須包含在代理的類路徑中。自定義接收器的類型是其FQCN。 https://flume.apache.org/FlumeUserGuide.html#custom-sink

而一旦自定義接收已經準備好,我怎麼能聯繫以下三個文件,以使代理工作:

  • 自定義接收
  • 攝入JAR(Java的模塊來執行攝取過程)
  • FlumeAgent.properties

感謝您的任何反饋。一旦我完成這項任務,我會繼續添加信息。

回答

1

希望您正在嘗試使用Flume從Kafka(源代碼)接收事件並將其轉發到ES(接收器),並且已有一些數據處理邏輯。

有了這個理解,我建議你看看Flume攔截器,它負責在發送到接收器之前動態更改/過濾事件。

因此,所有改變事件的業務邏輯都可以作爲自定義攔截器來實現,並且應該配置爲Flume通道。

僅供參考,您可以結算已有的native interceptors source code。這應該可以讓你對Flume攔截器框架有所瞭解。

這裏是ES Sink source code

樣品水槽配置

a1.sources = kafkaSource 
a1.sinks = ES_Sink 
a1.channels = channel1 

a1.sources.kafkaSource.interceptors = i1 
a1.sources.kafkaSource.interceptors.i1.type = org.apache.flume.interceptor.<Custom_Interceptor_name>$Builder 

a1.sinks.ES_Sink.channel = channel1 
a1.sinks.ES_Sink.type = elasticsearch 
a1.sinks.ES_Sink.hostNames = 127.0.0.1:9200 
+0

感謝您的反饋! –

相關問題