我已經定義了一個AVRO模式,併爲這些模式生成了一些使用avro工具的類。現在,我想將數據序列化到磁盤。我發現了一些關於scala的答案,但不適用於Java。 Article
類是使用avro-tools生成的,並且是由我定義的模式構建的。如何將數據序列化到Spark中的AVRO模式(使用Java)?
這裏是我如何努力去做代碼的簡化版本:
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<Article> processingFiles = filesRDD.map(fileNameContent -> {
// The name of the file
String fileName = fileNameContent._1();
// The content of the file
String fileContent = fileNameContent._2();
// An object from my avro schema
Article a = new Article(fileContent);
Processing processing = new Processing();
// .... some processing of the content here ... //
processing.serializeArticleToDisk(avroFileName);
return a;
});
其中serializeArticleToDisk(avroFileName)
定義如下:
public void serializeArticleToDisk(String filename) throws IOException{
// Serialize article to disk
DatumWriter<Article> articleDatumWriter = new SpecificDatumWriter<Article>(Article.class);
DataFileWriter<Article> dataFileWriter = new DataFileWriter<Article>(articleDatumWriter);
dataFileWriter.create(this.article.getSchema(), new File(filename));
dataFileWriter.append(this.article);
dataFileWriter.close();
}
其中Article
是我的Avro的模式。現在
,映射器拋出我的錯誤:
java.io.FileNotFoundException: hdfs:/...path.../avroFileName.avro (No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
at org.apache.avro.file.SyncableFileOutputStream.<init>(SyncableFileOutputStream.java:60)
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at sentences.ProcessXML.serializeArticleToDisk(ProcessXML.java:207)
. . . rest of the stacktrace ...
雖然文件路徑是正確的。
之後我使用了collect()
方法,因此map
函數中的其他所有內容都可以正常工作(序列化部分除外)。
我對Spark很新,所以我不確定這是否可能實際上是微不足道的。我懷疑我需要使用一些寫作功能,而不是寫在映射器中(雖然不確定這是否是真的)。任何想法如何解決這個問題?
編輯:
錯誤堆棧跟蹤我展示的最後一行,實際上是對這一部分:
dataFileWriter.create(this.article.getSchema(), new File(filename));
這是引發實際的錯誤的部分。我假設dataFileWriter
需要用別的東西替換。有任何想法嗎?
也許看看這裏的討論和答案:http://stackoverflow.com/questions/20612571/spark-writing-to-avro-file –
我已經看到一個,我更感興趣的Java等效。感謝您的評論! – Belphegor