我試圖使用Kafka Connect從Oracle數據庫接收數據。 Kafka連接器提供的默認對象是「GenericRecord」類型。這使得它的方式過於具體,導致通過執行record.getAsString(「someIDENTIFIER」)獲取數據的情況。是否有可能獲取特定類型的對象而不是GenericRecord類型。Kafka Connect集成
回答
卡夫卡連接信號源連接器與SourceRecord
對象一起工作,並且卡夫卡連接工人被配置爲使用一個變換器序列化的SourceRecord
爲二進制形式,其隨後寫入到卡夫卡主題。 Kafka Connect附帶JSON轉換器,Confluent提供Avro轉換器。所以,寫給Kafka的消息的二進制形式取決於你使用的是哪個轉換器。
(同樣地,水槽連接器與SinkRecord
對象一起工作,並且卡夫卡連接工人使用其轉換器反序列化從卡夫卡讀取消息的二進制形式爲SinkRecord
對象與所述連接器的交易。)
聽起來就像你正在寫一個卡夫卡消費者,並在那裏看到GenericRecord
對象。如果是這樣,那麼你可能已經配置了卡夫卡連接工作者使用匯合的Avro的轉換器,這對於信號源連接器像JDBC連接器SourceRecord
轉換成Avro的二進制格式卡夫卡連接,然後寫入到卡夫卡的話題。您的客戶端,然後有可能使用與Avro的解串器配置了卡夫卡的消費者,除非你給解串器的Avro的模式與它的工作將反序列化的Avro編碼消息到Avro的GenericRecord
。
但是,您可以將應用程序配置爲了解Avro架構的特定版本,並讓構建系統爲該版本的Avro架構生成代碼,以創建將反序列化Avro編碼的特定代碼消息轉換爲由模式描述的內存中形式。在Java中,這意味着您需要從模式生成類,然後在代碼中使用生成的類將GenericRecord
複製到類的實例中。請參閱this complete consumer example,特別是this line,用於從GenericRecord
轉換。在這個例子中,LogLine
從Avro的模式生成的類:Avro公司的
GenericRecord genericEvent = (GenericRecord) messageAndMetadata.message();
LogLine event = (LogLine) SpecificData.get().deepCopy(LogLine.SCHEMA$, genericEvent);
一個顯著的好處是,它直接支持模式演化,並匯合的模式註冊採取的這種優勢。因此,儘管源連接器可能會演變生成的表的Avro模式,以響應數據庫中表的結構更改,但只要數據庫模式發生變化以便Avro模式向後兼容,則Avro庫您的客戶端應用程序使用將自動從消息的Avro模式轉換爲您的應用程序使用的Avro模式。
當然,在某些時候,你會改變你的應用程序使用新的Avro的模式,但是,這並不必須是在同一時間。事實上,如果你配置模式註冊爲強制執行模式版本是向前和向後兼容,你可以或前更改客戶端應用程序後數據庫中更改和JDBC源連接器開始使用的Avro的新版本架構。
謝謝你這麼多你的迴應。但我正在使用Scala編寫消費者。另外,我們使用Avro生成所有模式並生成相應的類。即使在使用scala或者有不同的方式來模擬這個時,這可以實現嗎? –
請參閱[此問題](https://stackoverflow.com/questions/31763571/importing-avro-schema-in-scala),瞭解如何使用[avro4s庫](https:/ /github.com/sksamuel/avro4s)來生成你的課程。也可能有其他的圖書館。 –
非常感謝你... –
- 1. Kafka Connect Logstash
- 2. Kafka Connect MySQL
- 3. OpenID Connect集成
- 4. Kafka Connect的用例
- 5. Kafka Connect Api - 入門?
- 6. Apache Flink Kafka集成
- 7. MSSQL和KAFKA集成
- 8. Ignite和Kafka集成
- 9. Kafka Connect警報選項?
- 10. Kafka Connect + Zookepeer沒有連接
- 11. 將informatica與Kafka集成
- 12. webpack和kafka-node集成
- 13. 如何集成Oracle和Kafka
- 14. angular.js與apache kafka的集成
- 15. Spring集成 - Apache ActiveMQ到Kafka
- 16. Spark流和kafka集成
- 17. 將Apache kafka與Angular.js集成
- 18. spark,kafka集成問題:對象kafka不是org.apache.spark.streaming的成員
- 19. 春季集成kafka如何處理生成kafka時的錯誤
- 20. Avro Records - > Kafka - > Kafka Connect Sink - > Amazon S3 Storage。冪等?
- 21. Apache-Kafka-Connect,Confluent-HDFS-Connector,未知魔術字節,Kafka-To-Hdfs
- 22. Kafka Connect SourceTask的輪詢時間間隔
- 23. 是否有Kafka Connect Python客戶端?
- 24. Kafka Connect HDFS接收器問題
- 25. 使用kafka connect的最佳實踐
- 26. Kafka Connect - 文件源連接器錯誤
- 27. Kafka Connect接收器分區:子分區?
- 28. 結構化流+ Kafka集成 - SSL和Kerberos集成?
- 29. Apache Kafka與tomcat和spring的集成
- 30. 爲Kafka實現Spring集成InboundChannelAdapter
您正在使用哪個Connect插件? JDBC? DBVisit?加洲的金門大橋? –
我正在使用融合JDBC。 「connector.class」:「io.confluent.connect.jdbc.JdbcSourceConnector」 –