2016-04-11 13 views
1

我已經定義了一個AVRO模式,併爲這些模式生成了一些使用avro工具的類。現在,我想將數據序列化到磁盤。我發現了一些關於scala的答案,但不適用於Java。 Article類是使用avro-tools生成的,並且是由我定義的模式構建的。如何將數據序列化到Spark中的AVRO模式(使用Java)?

這裏是我如何努力去做代碼的簡化版本:

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath); 
JavaRDD<Article> processingFiles = filesRDD.map(fileNameContent -> { 
    // The name of the file 
    String fileName = fileNameContent._1(); 
    // The content of the file 
    String fileContent = fileNameContent._2(); 

    // An object from my avro schema 
    Article a = new Article(fileContent); 

    Processing processing = new Processing(); 
    // .... some processing of the content here ... // 

    processing.serializeArticleToDisk(avroFileName); 

    return a; 
}); 

其中serializeArticleToDisk(avroFileName)定義如下:

public void serializeArticleToDisk(String filename) throws IOException{ 
    // Serialize article to disk 
    DatumWriter<Article> articleDatumWriter = new SpecificDatumWriter<Article>(Article.class); 
    DataFileWriter<Article> dataFileWriter = new DataFileWriter<Article>(articleDatumWriter); 
    dataFileWriter.create(this.article.getSchema(), new File(filename)); 
    dataFileWriter.append(this.article); 
    dataFileWriter.close(); 
} 

其中Article是我的Avro的模式。現在

,映射器拋出我的錯誤:

java.io.FileNotFoundException: hdfs:/...path.../avroFileName.avro (No such file or directory) 
at java.io.FileOutputStream.open0(Native Method)  
at java.io.FileOutputStream.open(FileOutputStream.java:270)  
at java.io.FileOutputStream.<init>(FileOutputStream.java:213) 
at java.io.FileOutputStream.<init>(FileOutputStream.java:162) 
at org.apache.avro.file.SyncableFileOutputStream.<init>(SyncableFileOutputStream.java:60) 
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129) 
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129) 
at sentences.ProcessXML.serializeArticleToDisk(ProcessXML.java:207)  
. . . rest of the stacktrace ... 

雖然文件路徑是正確的。

之後我使用了collect()方法,因此map函數中的其他所有內容都可以正常工作(序列化部分除外)。

我對Spark很新,所以我不確定這是否可能實際上是微不足道的。我懷疑我需要使用一些寫作功能,而不是寫在映射器中(雖然不確定這是否是真的)。任何想法如何解決這個問題?

編輯:

錯誤堆棧跟蹤我展示的最後一行,實際上是對這一部分:

dataFileWriter.create(this.article.getSchema(), new File(filename));

這是引發實際的錯誤的部分。我假設dataFileWriter需要用別的東西替換。有任何想法嗎?

+0

也許看看這裏的討論和答案:http://stackoverflow.com/questions/20612571/spark-writing-to-avro-file –

+0

我已經看到一個,我更感興趣的Java等效。感謝您的評論! – Belphegor

回答

1

該解決方案是不使用的數據幀,並且不拋出任何錯誤:

import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.avro.mapred.AvroKey; 
import org.apache.spark.api.java.JavaPairRDD; 
import scala.Tuple2; 

    . . . . . 

// Serializing to AVRO 
JavaPairRDD<AvroKey<Article>, NullWritable> javaPairRDD = processingFiles.mapToPair(r -> {  
    return new Tuple2<AvroKey<Article>, NullWritable>(new AvroKey<Article>(r), NullWritable.get()); 
}); 
Job job = AvroUtils.getJobOutputKeyAvroSchema(Article.getClassSchema()); 
javaPairRDD.saveAsNewAPIHadoopFile(outputDataPath, AvroKey.class, NullWritable.class, AvroKeyOutputFormat.class, 
     job.getConfiguration()); 

其中AvroUtils.getJobOutputKeyAvroSchema是:

public static Job getJobOutputKeyAvroSchema(Schema avroSchema) { 
    Job job; 

    try { 
     job = new Job(); 
    } catch (IOException e) { 
     throw new RuntimeException(e); 
    } 

    AvroJob.setOutputKeySchema(job, avroSchema); 
    return job; 
} 

類似的事情的Spark + Avro可以在這裏找到 - >https://github.com/CeON/spark-utils

0

看來你是以錯誤的方式使用Spark。

Map是一個轉換函數。只要撥打map不會調用RDD的調用。您必須致電動作,如forEach()collect()

另請注意,提供給map的lambda將在驅動程序中序列化並轉移到羣集中的某個Node

ADDED

嘗試使用星火SQL和Spark-Avro保存星火DataFrame在Avro的格式:

// Load a text file and convert each line to a JavaBean. 
JavaRDD<Person> people = sc.textFile("/examples/people.txt") 
    .map(Person::parse); 

// Apply a schema to an RDD 
DataFrame peopleDF = sqlContext.createDataFrame(people, Person.class); 
peopleDF.write() 
    .format("com.databricks.spark.avro") 
    .save("/output"); 
+0

你在說什麼 - 'map'絕對會調用'RDD'的計算。 'map'返回一個新的'RDD',其中所有的元素都是基於'map'函數重新計算的。 –

+0

@Denis Kokorin:之後我使用'collect()',所以'map'中的所有東西都可以工作,沒關係。除了序列化之外的任何東西都可以在'map'函數中使用。 – Belphegor

+0

也許他意味着你應該在「地圖」之後加上一個'foreach'並在那裏寫作?如果這個答案有示例代碼,這可能會有所幫助。 –

相關問題