2017-02-20 117 views
1

我一直在嘗試使用APACHE KAFKA和FLUME將數據流到MySQL數據庫。 (這是我的水槽的配置文件)流到mysql的流水線

agent.sources=kafkaSrc 
agent.channels=channel1 
agent.sinks=jdbcSink 

agent.channels.channel1.type=org.apache.flume.channel.kafka.KafkaChannel 
agent.channels.channel1.brokerList=localhost:9092 
agent.channels.channel1.topic=kafkachannel 
agent.channels.channel1.zookeeperConnect=localhost:2181 
agent.channels.channel1.capacity=10000 
agent.channels.channel1.transactionCapacity=1000 


agent.sources.kafkaSrc.type = org.apache.flume.source.kafka.KafkaSource 
agent.sources.kafkaSrc.channels = channel1 
agent.sources.kafkaSrc.zookeeperConnect = localhost:2181 
agent.sources.kafkaSrc.topic = kafka-mysql 

***agent.sinks.jdbcSink.type = How to declare this?*** 
agent.sinks.jdbcSink.connectionString = jdbc:mysql://1.1.1.1:3306/test 
agent.sinks.jdbcSink.username=user 
agent.sinks.jdbcSink.password=password 
agent.sinks.jdbcSink.batchSize = 10 
agent.sinks.jdbcSink.channel =channel1 
agent.sinks.jdbcSink.sqlDialect=MYSQL 
agent.sinks.jdbcSink.driver=com.mysql.jdbc.Driver 
agent.sinks.jdbcSink.sql=(${body:varchar}) 

我知道如何將數據傳送到Hadoop的HBase的或(記錄器型或HDFS型),但是找不到一個類型以流到MySQL數據庫。所以我的問題是我如何聲明jdbcSink.type?

+0

水槽中沒有JDBC接收器。使用Flume無法將數據傳輸到MySQL。 – franklinsijo

+0

@franklinsijo感謝您的回覆。那麼有什麼方法可以將數據從Kafka提取到RDBMS?我願意接受任何建議。 – SLIT

+0

[Kafka JDBC Sink](http://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_connector.html)是不可能的? – franklinsijo

回答

0

您可以隨時爲MySQL創建自定義接收器。這就是我們在FIWARE用Cygnus工具所做的。

隨意從它那裏得到啓發:https://github.com/telefonicaid/fiware-cygnus/blob/master/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIMySQLSink.java

它擴展了我們所有的匯這個其他自定義的基類:https://github.com/telefonicaid/fiware-cygnus/blob/master/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSISink.java

基本上,你必須擴展AbstractSink並實現Configurable接口。這意味着覆蓋人至少以下方法:

public Status process() throws EventDeliveryException 

和:分別

public void configure(Context context) 

+0

非常感謝。 – SLIT