2015-10-07 59 views
2

我是Spark的新手,正在嘗試讀取csv文件並獲取文件中的第一列和第二列。儘管如此,csv文件是巨大的,我不想解析csv文件中的每一行。另外,運行collect()函數可能會導致進程崩潰,因爲內存可能不足以支持返回的數據量。所以我想知道是否可以用csv數據的一個子集創建一個RDD。例如,是否可以生成包含csv文件的第10至1000行的RDD並忽略其他行。如何獲取csv文件的子集作爲Spark RDD

現在,我只有

csvdata = sc.textFile("hdfs://nn:port/datasets/sample.csv").map(lambda line: line.split(",")) 

這基本上整個CSV文件創建一個RDD。是否可以從csvdata創建一個RDD,其中只包含10到1000行?

非常感謝您提供的幫助。

+0

http://stackoverflow.com/questions/15644859/how-to-read-specific-part-of-large-file-in-python – Ashalynd

回答

2

您可以通過索引加載所有和過濾器:

rdd = sc.parallelize(range(0, -10000, -1)) 
rdd.zipWithIndex().filter(lambda kv: 9 <= kv[1] < 999).keys() 

調整範圍取決於你如何定義10日線。

+0

This Works。我需要在textFile()的回答中替換parallelize()函數,並且它工作正常。非常感謝。 – thisisshantzz

0

RDD不是存儲在內存中的數據,而是有意對某些數據進行處理。當您調用終端操作時,如「收集」或「減少」,則Spark將處理數據。 Spark根據您在RDD上的操作歷史記錄,進行了一些巧妙的優化,從而限制了它必須完成的工作量。

(通過調用一個RDD一些操作,但是不能呼叫終端操作自己嘗試一下。沒有任何反應!)

所以,你可以做如(這是斯卡拉但是在Python不是太不相似)

val first10results: Array[Array[String]] = sc.textFile(filePath) 
     .map(f => f.split(",")) 
     .take(10) 

星火就知道了,因爲take(10),您只需要前10行。所以它只會從文件中取10行!簡單。

相關問題