2015-09-24 77 views
1

我正在嘗試開發一個Java Spark應用程序,該應用程序從HDFS中讀取AVRO記錄(https://avro.apache.org/),並使用名爲Gobblin(https://github.com/linkedin/gobblin/wiki)的技術。如何使用Java讀取Spark 1.3.1中的AVRO數據?

樣品HDFS AVRO數據文件:

/gobblin/work/job-output/KAFKA/kafka-gobblin-hdfs-test/20150910213846_append/part.task_kafka-gobblin-hdfs-test_1441921123461_0.avro

不幸的是,我發現用Java編寫的例子有限。

我發現是用Scala編寫的最好的事情(使用Hadoop版本1個庫)。

任何幫助,將不勝感激。

目前我想到了用下面的代碼,雖然我是如何從我的AVRO數據中提取值的HashMap的不確定:

JavaPairRDD avroRDD = sc.newAPIHadoopFile( 
    path, 
    AvroKeyInputFormat.class, 
    AvroKey.class, 
    NullWritable.class, 
    new Configuration()); 

// JavaPairRDD avroRDD = sc.newAPIHadoopFile( 
// path, 
// AvroKeyValueInputFormat.class, 
// AvroKey.class, 
// AvroValue.class, 
// new Configuration()); 

我目前的Maven依賴:

<dependencies> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.10</artifactId> 
     <version>1.3.1</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.avro</groupId> 
     <artifactId>avro</artifactId> 
     <version>1.7.6</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.avro</groupId> 
     <artifactId>avro-mapred</artifactId> 
     <version>1.7.6</version> 
     <classifier>hadoop2</classifier> 
    </dependency> 
    <dependency> 
     <groupId>com.fasterxml.jackson.core</groupId> 
     <artifactId>jackson-annotations</artifactId> 
     <version>2.4.3</version> 
    </dependency> 


    <dependency> 
     <groupId>org.slf4j</groupId> 
     <artifactId>slf4j-api</artifactId> 
     <scope>provided</scope> 
    </dependency> 

    <dependency> 
     <groupId>org.slf4j</groupId> 
     <artifactId>slf4j-log4j12</artifactId> 
     <scope>provided</scope> 
    </dependency> 

    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <scope>test</scope> 
    </dependency> 

</dependencies> 
+1

爲什麼不使用['spark-avro'](https://github.com/databricks/spark-avro)(v.0.0.0)?像這樣的東西應該可以工作:'HashMap options = new HashMap (); options.put(「path」,path); DataFrame df = sqlContext.load(「com.databricks.spark.avro」,options);' – zero323

+0

理想情況下,只需要針對標準Spark Java API編寫代碼。所以除了Avro的依賴關係之外,我不想使用任何外部庫。 – Mark

+0

另一本名爲「Hadoop應用程序體系結構」[Hadoop應用程序體系結構JavaSessionize示例]的書籍(https://github.com/hadooparchitecturebook/hadoop-arch-book/blob/master/ch08-clickstream/JavaSessionize/src/main /java/com/hadooparchitecturebook/clickstream/JavaSessionize.java),但是,我寧願不必編寫「* .avsc」文件,並依賴avro-maven插件來生成所需的類文件。 – Mark

回答

2

我寫了一個小樣本,能夠讀取我的樣本Gobblin Avro記錄,並使用Spark輸出相關結果(spark-hdfs-avro-test)。值得一提的是,我需要解決幾個問題。 任何意見或反饋將不勝感激。

問題1:沒有與當前的Avro版本(1.7.7)和Java序列化問題:

引述:

Spark依靠Java的Serializable接口來序列化對象。 Avro對象不實現Serializable。因此,要使用Spark中的Avro對象,您需要繼承Avro生成的類並實現Serializable,例如, https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SerializableAminoAcid.java

爲了解決這個問題,我寫我自己的序列化包裝類:

問題2:我的Avro的消息不包含一個 「關鍵」值。

不幸的是,我無法使用外的開箱任何輸入格式,不得不寫我自己:AvroValueInputFormat

public class AvroValueInputFormat<T> extends FileInputFormat<NullWritable, AvroValue<T>> { 

我無法使用以下命令:

# org.apache.avro.mapreduce.AvroKeyInputFormat 
public class AvroKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> { 

# org.apache.avro.mapreduce.AvroKeyValueInputFormat 
public class AvroKeyValueInputFormat<K, V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> { 

問題3:我無法使用AvroJob類設置器來設置模式值,我不得不手動執行此操作。

hadoopConf.set("avro.schema.input.key", Schema.create(org.apache.avro.Schema.Type.NULL).toString()); //$NON-NLS-1$ 
    hadoopConf.set("avro.schema.input.value", Event.SCHEMA$.toString()); //$NON-NLS-1$ 
    hadoopConf.set("avro.schema.output.key", Schema.create(org.apache.avro.Schema.Type.NULL).toString()); //$NON-NLS-1$ 
    hadoopConf.set("avro.schema.output.value", SeverityEventCount.SCHEMA$.toString()); //$NON-NLS-1$ 
相關問題