2014-11-14 78 views
1

我從一個文件RDD[String]我應該如何將RDD [String]轉換爲RDD [(String,String)]?

val file = sc.textFile("/path/to/myData.txt") 

的myData的格式:

>str1_name 
ATCGGKFKKVKKFKRLFFVLFLRL 
FDJKALGFJVKRIKFKVKFGKLRL 
... 
FJDLALLLGL //the last line of str1 
>str2_name 
ATCGGKFKKVKKFKRLFFVLFLRL 
FDJKALGFJVKRIKFKVKFGKLRL 
... 
FJDLALLLGL //the last line of str2 
>str3_name 
... 

我應該怎麼做才能從文件轉換的數據結構RDD[(String, String)]? 例如,

trancRDD(
(str1_name, ATCGGKFKKVKKFKRLFFVLFLRLFDJKALGFJVKRIKFKVKFGKLRL), 
(str2_name, ATCGGKFKKVKKFKRLFFVLFLRLFDJKALGFJVKRIKFKVKFGKLRL), 
... 
) 
+0

我們已經使用自定義Hadoop的輸入格式做了類似的事情,但它是不平凡的。如果我是你,我寧願寫一個小程序來將輸入轉換爲適合Spark的格式。 – maasg

+0

由於您想要的轉換取決於當前元素之前的元素(上一行以「>」開頭),因此不可能在分區之間分配此分區(因爲前一行> -line可能不在分區中)。正如@maasg所說,一些預處理將文件轉換爲正確的格式會更好。 –

+0

謝謝你們! @maasg Paul – fanhk

回答

1

如果有一個定義記錄分隔符,如「>」上面所指出的,這可以使用自定義Hadoop配置來完成:

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.io.{LongWritable, Text} 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat 

val conf = new Configuration 
conf.set("textinputformat.record.delimiter", ">") 
// genome.txt contains the records provided in the question without the "..." 
val dataset = sc.newAPIHadoopFile("./data/genome.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf) 
val data = dataset.map(x=>x._2.toString) 

讓我們來看看數據

data.collect 
res11: Array[String] = 
Array("", "str1_name 
ATCGGKFKKVKKFKRLFFVLFLRL 
FDJKALGFJVKRIKFKVKFGKLRL 
FJDLALLLGL 
", "str2_name 
ATCGGKFKKVKKFKRLFFVLFLRL 
FDJKALGFJVKRIKFKVKFGKLRL 
FJDLALLLGL 
") 

我們可以很容易地記錄了這個字符串的

val records = data.map{ multiLine => val lines = multiLine.split("\n"); (lines.head, lines.tail)} 
records.collect 
res14: Array[(String, Array[String])] = Array(("",Array()), 
     (str1_name,Array(ATCGGKFKKVKKFKRLFFVLFLRL, FDJKALGFJVKRIKFKVKFGKLRL, FJDLALLLGL)), 
     (str2_name,Array(ATCGGKFKKVKKFKRLFFVLFLRL, FDJKALGFJVKRIKFKVKFGKLRL, FJDLALLLGL))) 

(使用過濾器採取的第一個空的記錄了...讀者練習)

相關問題