2016-02-22 66 views
-1

我需要從存儲在HDFS中的製表符分隔文件中提取幾列。我需要使用pyspark過濾掉RDD中的一些內容

我能夠從HDFS讀入一個RDD,行分裂成列表,但我不知道如何讓我關心的列..

我的代碼:

raw_file = sc.textFile("hdfs.......tsv") 
rdd = raw_file.map(lambda line: line.split('\t')) 

newfile中的每一列都包含相同類型的內容,我想提取第26列,80,109,452列並將它們放入列表中。

我想:

filtered = rdd.filter(lambda line: append(line[26]), append(line[80]), append(line[109], append(line[452])).collect() 

但顯然沒有append方法。那麼我該怎麼做?

+0

拆分,讓你列表中的「行」,那麼你是否「HTTP」是在該列表中。你永遠不會檢查該列表中的元素是否以「http」開頭。 –

+0

你這樣做,但你說你想檢查列表是否包含以http開頭的項目。這有點不同,你不覺得嗎? –

+0

我只是想從pipelinedRDD提取四列 –

回答

0

過濾器用於根據某些條件返回或省略

rdd = sc.range(10) 
even = rdd.filter(lambda x: x % 2 == 0) 
even.collect() 
# Out: [0, 2, 4, 6, 8] 

你在找什麼是地圖。地圖用於轉換(或在您的情況下提取部分)一行。我們以字母列表作爲例子。如果我只想要第1,第3和第5個元素(列),我可以使用地圖來提取它們。

rdd = sc.range(2).map(lambda i: ["a", "b", "c", "d", "e", "f"]) 
print rdd.collect() 
# [['a', 'b', 'c', 'd', 'e', 'f'], ['a', 'b', 'c', 'd', 'e', 'f']] 

mapped = rdd.map(lambda r: [r[1], r[3], r[5]]) 
print mapped.collect() 
# [['b', 'd', 'f'], ['b', 'd', 'f']] 

過濾作用於行,地圖作用於數據

+0

謝謝!我會試試看 –