2014-05-12 96 views
1

我是Spark & Scala新手。從Spark RDD讀取Kryo文件

我需要閱讀和分析星火,它寫在KRYO我的Scala代碼文件系列化:

import com.esotericsoftware.kryo.Kryo 
import com.esotericsoftware.kryo.io.Output 

val kryo:Kryo = new Kryo() 
val output:Output = new Output(new FileOutputStream("filename.ext",true)) 

//kryo.writeObject(output, feed) (tested both line) 
kryo.writeClassAndObject(output, myScalaObject) 

這是創造我的對象(myScalaObject)的文件序列化的僞碼這是一個複雜的對象。

文件似乎寫的很好,但我有問題,當我在星火RDD閱讀

僞代碼星火:

val conf = new SparkConf() 
    .setMaster("local") 
    .setAppName("My application") 
    .set("spark.executor.memory", "1g") 


conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
conf.set("spark.kryo.registrator", "myScalaObject") 

val sc = new SparkContext(conf) 

val file=sc.objectFile[myScalaObject]("filename.ext") 

val counts = file.count() 

當我試圖執行它,我收到此錯誤:

org.apache.spark.SparkException:作業中止:任務0.0:0失敗1次(最近的失敗:異常故障:java.io.IOException的:文件:FILENAME.EXT不是SequenceFile)

是否可以在Spark中讀取這種類型的文件?

如果這種解決方案是不可能的,那麼創建一個複雜的文件結構來讀取Spark的好方法是什麼?

謝謝

+1

'objectFile'用於加載保存爲含有序列化對象一個SequenceFile一個'RDD'。爲什麼不使用Kryo讀取對象並使用'parallel'來生成'RDD'? – zsxwing

+0

@zsxwing謝謝你,很好主意,我試了一下。但我有很多小的(5-20​​mb),並不想並行化文件的內容。有沒有什麼方法可以並行化文件名,然後每個服務器讀取它的文件? – faster2b

+1

用文件名創建一個RDD並用'map'讀取內容? – zsxwing

回答

2

如果你想與objectFile閱讀,與saveAsObjectFile寫出來的數據。

val myObjects: Seq[MyObject] = ... 
val rddToSave = sc.parallelize(myObjects) // Or better yet: construct as RDD from the start. 
rddToSave.saveAsObjectFile("/tmp/x") 
val rddLoaded = sc.objectFile[MyObject]("/tmp/x") 

另外,作爲zsxwing說,你可以創建一個RDD文件名,並使用map讀取每個的內容。如果希望每個文件被讀入一個單獨的分區,並行化的文件名到單獨的分區:

def loadFiles(filenames: Seq[String]): RDD[Object] = { 
    def load(filename: String): Object = { 
    val input = new Input(new FileInputStream(filename)) 
    return kryo.readClassAndObject(input) 
    } 
    val partitions = filenames.length 
    return sc.parallelize(filenames, partitions).map(load) 
} 
+1

你現在也可以使用'sc.wholeTextFiles'。我必須在某個時候更新答案。 –