2016-11-21 30 views
0

使用JSON生成/使用來自Kafka的卡。使用下面的屬性保存到HDFS在JSON:使用JsonConverter的Kafka Connect HDS Sink for JSON格式

key.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter=org.apache.kafka.connect.json.JsonConverter 
key.converter.schemas.enable=false 
value.converter.schemas.enable=false 

監製:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \ 
     --data '{"schema": {"type": "boolean", "optional": false, "name": "bool", "version": 2, "doc": "the documentation", "parameters": {"foo": "bar" }}, "payload": true }' "http://localhost:8082/topics/test_hdfs_json" 

消費者:

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-hdfs/quickstart-hdfs.properties 

問題1:

key.converter.schemas.enable=true 

value.converter.schemas.enable=true 

獲取異常:

org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields 
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332) 

問題2:

啓用上述兩個屬性不拋出任何問題,但沒有數據寫入了HDFS。

任何建議將高度讚賞。

感謝

回答

2

轉換器指的是如何將數據從卡夫卡的話題被翻譯成由連接器進行解釋,並寫入到HDFS。 HDFS連接器僅支持將HDFS寫入HDFS或實木複合地板。您可以找到有關如何將格式擴展爲JSON here的信息。如果你做了這樣的擴展,我鼓勵你將它貢獻給連接器的開源項目。

+0

感謝您的建議! –

+0

@dawsaw你知道這樣的擴展是否可以使用本地kafka connect api實現? –

+0

有一個已經與Kafka一起出貨的JsonConverter。我認爲這裏的問題是特定於HDFS連接器的輸出格式,這必然意味着擴展連接器,如果我已經正確理解了您的問題,則本身不會在Connect本身做任何事情。 – dawsaw

0

對於輸入JSON格式的消息被寫入到HDFS,請設置以下屬性

key.converter=org.apache.kafka.connect.storage.StringConverter 
value.converter=org.apache.kafka.connect.storage.StringConverter 
key.converter.schemas.enable=false 
value.converter.schemas.enable=false 
+0

將檢查Akshat。謝謝你的評論 –

相關問題