2016-08-12 149 views
0

我有一個java卡夫卡消費者,從卡夫卡消費avro數據[說主題x]。它應該將這些數據推送到HDFS,因爲它不需要代碼生成。在Avro公司documentation他們使用類似以下內容:寫AVRO數據到Hadoop hdfs

GenericRecord e1 = new GenericData.Record(schema);  
e1.put("key", "value"); 

DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema); 

DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); 
dataFileWriter.create(schema, new File("<HDFS file path>")); 

dataFileWriter.append(e1); 
dataFileWriter.close(); 

問題的,這是,我已經有Avro的數據。要使用這一系列步驟,我必須在反序列化avro數據包之後提取每個鍵 - 值對,然後將其推送到GenericRecord對象,我認爲這沒有任何意義。我沒有找到任何我想要實現的例子。任何提示或鏈接到相關文件非常感謝。

+2

您有沒有考慮使用Kafka內置的Kafka Connect框架結合現有的Kafka-> HDFS連接器,如[kafka-connect-hdfs](https:// github .COM/confluentinc /卡夫卡連接-HDFS)?鏈接的HDFS接收器連接器支持Avro開箱即用。 –

+0

由於一些模式註冊問題,我們已經從融合中移除。所以我不能使用融合框架。 – Bitswazsky

+0

關心詳細說明您已經遇到的模式註冊表問題?當然也可以通過https://github.com/confluentinc/schema-registry/issues完成。 –

回答

0

如果我正確理解你的問題,我建議你嘗試com.twitter.bijection.Injection和com.twitter.bijection.avro.GenericAvroCodecs軟件包,例如。

看看這裏http://aseigneurin.github.io/2016/03/04/kafka-spark-avro-producing-and-consuming-avro-messages.html。 那裏,在卡夫卡生產者的GenericRecord被轉換爲字節[],這是放在卡夫卡話題,然後在消費者這個字節根據您的架構倒置成一個GenericRecord。而且您不需要將值記錄到記錄中的所有字段。之後,您可以將此記錄寫入文件。

而且,您也可能需要以其他方式訪問HDFS中的文件,因爲您無法爲其創建File實例。