2016-09-09 74 views
1

我正在嘗試使用flink將實木複合地板文件編寫爲實木複合地板。 我正在使用以下代碼並獲取錯誤。Flink轉換爲實木複合地板錯誤

val parquetFormat = new HadoopOutputFormat[Void, String](new AvroParquetOutputFormat, job) 
FileOutputFormat.setOutputPath(job, new Path(outputPath)) 

我得到以下的生成錯誤。有人可以幫忙嗎?

類型不匹配;發現:parquet.avro.AvroParquetOutputFormat 必需: org.apache.hadoop.mapreduce.OutputFormat [Void,String] ingestion.scala/flink-scala/src/main/scala/com/sc/edl/flink line 75 Scala問題

回答

1

要創建HadoopOutputFormat[Void, String]需要OutputFormat[Void, String]

您提供了一個AvroParquetOutputFormat,其延伸ParquetOutputFormat<IndexedRecord>ParquetOutputFormat定義爲ParquetOutputFormat<T> extends FileOutputFormat<Void, T>

所以你提供一個OutputFormat[Void, IndexedRecord]HadoopOutputFormat[Void, String]預計OutputFormat[Void, String]

你應該改變parquetFormat

val parquetFormat = new HadoopOutputFormat[Void, IndexedRecord](
    new AvroParquetOutputFormat, job) 
FileOutputFormat.setOutputPath(job, new Path(outputPath)) 

如果你想寫出DataSet(Void, IndexedRecord)型的沒有,你應該添加一個MapFunction您的數據轉換成(Void, IndexedRecord)雙。

+0

謝謝Fabian,很抱歉,但我對此有所瞭解,請您提供正確的語法或錯誤信息 – Niki

+0

我擴展了我的答案 –

1

仍然問題仍然存在,因爲Flink元組目前不支持NULL鍵。會發生 以下錯誤: Caused by: org.apache.flink.types.NullFieldException: Field 1 is null, but expected to hold a value.

更好的選擇是使用KiteSDK在這個例子說明: https://github.com/nezihyigitbasi/FlinkParquet 所以,如果你需要動態模式則這種做法是行不通的,因爲你需要堅持的模式嚴格。而且,這對於閱讀而言並不適合寫作。

Spark DataFrame不僅在API方面,而且在性能方面與Parquet的效果非常好。但如果你想使用Flink,那麼你需要等待flink社區發佈api或者編輯自己的parquet-hadoop代碼,這可能是一個很大的努力。

只有這些連接器實現尚未 https://github.com/apache/flink/tree/master/flink-connectors 所以,我個人的建議是,如果你可以使用火花所以要爲它,它有更多的成熟的API考慮生產用例。當你堅持使用flink的基本需求時,你可能會陷入別的地方。

不要浪費時間找到Flink的解決方案,我浪費了很多關鍵時間,而不是像Hive,Spark或MR這樣的標準選項。

相關問題