2014-07-01 98 views
1

我創建了一個spark工作,每天從我的hdfs讀入一個文本文件,並從文本文件的每一行中提取唯一鍵。每個文本文件中大約有50000個鍵。相同的數據然後通過提取的密鑰進行過濾並保存到hdfs。使用Spark多次寫入hadoop分佈式文件系統

我想在我的hdfs中創建一個目錄,其結構爲:hdfs://.../date/key,它包含已過濾的數據。問題在於寫入hdfs需要很長的時間,因爲密鑰太多了。

這是寫現在的方式:

val inputData = sparkContext.textFile(""hdfs://...", 2) 
val keys = extractKey(inputData) //keys is an array of approx 50000 unique strings 
val cleanedData = cleanData(inputData) //cleaned data is an RDD of strings 
keys.map(key => { 
    val filteredData = cleanedData.filter(line => line.contains(key)) 
    filteredData.repartition(1).saveAsTextFile("hdfs://.../date/key") 
}) 

有沒有辦法讓這個更快?我想過將數據重新分區爲提取的鍵的數量,但是我無法以格式hdfs://.../date/key保存。我也試過groupByKey,但是我不能保存這些值,因爲它們不是RDD。

任何幫助表示讚賞:)

+0

這個問題是重複的http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job – samthebest

+0

我正在尋找一種解決方案,使用* * saveAsTextFile **而不是saveAsHadoopFile,並將它們保存到單獨的目錄中,而不僅僅是具有不同名稱的不同文件。我已經實施了您所鏈接的解決方案。但具體而言,我想知道是否有更快的方式來創建多個目錄。 – akinos

+0

如果我有50,000個密鑰,並且我需要創建50,000個分區來映射每個密鑰,那麼鏈接到的解決方案仍然很慢。 – akinos

回答

0
def writeLines(iterator: Iterator[(String, String)]) = { 
    val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map 
    try { 
    while (iterator.hasNext) { 
    val item = iterator.next() 
    val key = item._1 
    val line = item._2 
    val writer = writers.get(key) match { 
     case Some(writer) => writer 
     case None => 
     val path = arg(1) + key 
     val outputStream = FileSystem.get(new Configuration()).create(new Path(path)) 
     writer = new BufferedWriter(outputStream) 
    } 
    writer.writeLine(line) 
    } finally { 
    writers.values.foreach(._close()) 
    } 
} 

val inputData = sc.textFile()  
val keyValue = inputData.map(line => (key, line)) 
val partitions = keyValue.partitionBy(new MyPartition(10))  
partitions.foreachPartition(writeLines) 


class MyPartitioner(partitions: Int) extends Partitioner { 
    override def numPartitions: Int = partitions 

    override def getPartition(key: Any): Int = { 
     // make sure lines with the same key in the same partition 
     (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions 
    } 
} 
0

我認爲這種方法應該是類似Write to multiple outputs by key Spark - one Spark job。分區號碼與目錄號碼無關。要實現它,您可能需要用您的自定義版本覆蓋generateFileNameForKeyValue以保存到不同的目錄。

關於可擴展性,它不是火花的問題,而是hdfs。但無論您如何實施,只要不改變要求,這是不可避免的。但我認爲Hdfs可能有50,000個文件處理程序

0

您正在爲輸入指定2個分區,爲輸出指定1個分區。這樣做的一個影響是嚴重限制了這些操作的並行性。爲什麼需要這些?

而不是計算50,000個過濾的RDD,這真的很慢,那麼直接按鍵分組怎麼樣?我知道你想將它們輸出到不同的目錄中,但這確實造成了瓶頸。有沒有另外一種方法來設計這個只是讓你閱讀(鍵,值)結果?