2017-02-06 28 views
2

我有一個很大的字符串ID數據集,它可以放入我的Spark集羣中的單個節點上的內存中。問題是它消耗了單個節點的大部分內存。PySpark中的內存高效笛卡爾連接

這些ID約30個字符長。例如:

ids 
O2LWk4MAbcrOCWo3IVM0GInelSXfcG 
HbDckDXCye20kwu0gfeGpLGWnJ2yif 
o43xSMBUJLOKDxkYEQbAEWk4aPQHkm 

我正在尋找寫入文件的所有對ID的列表。例如:

id1,id2 
O2LWk4MAbcrOCWo3IVM0GInelSXfcG,HbDckDXCye20kwu0gfeGpLGWnJ2yif 
O2LWk4MAbcrOCWo3IVM0GInelSXfcG,o43xSMBUJLOKDxkYEQbAEWk4aPQHkm 
HbDckDXCye20kwu0gfeGpLGWnJ2yif,O2LWk4MAbcrOCWo3IVM0GInelSXfcG 
# etc... 

所以我需要交叉連接數據集本身。我希望在使用10節點羣集的PySpark上執行此操作,但它需要具有高效的內存。

+0

數據集包含多少個記錄?每個節點有多少內存?你使用純RDD還是Dataframes API? – Mariusz

+0

@Mariusz現在,這是在主文本文件中,但是當我將它讀入內存中的python列表時,它消耗了8GB RAM中的80%。列表長度約爲100M記錄。我可以將數據集放入RDD或Dataframe中。 – mgoldwasser

回答

3

pySpark將輕鬆處理您的數據集並提高內存效率,但處理10^8 * 10^8條記錄(這是交叉連接結果的估計大小)需要一些時間。請參閱示例代碼:

from pyspark.sql.types import * 
df = spark.read.csv('input.csv', header=True, schema=StructType([StructField('id', StringType())])) 
df.withColumnRenamed('id', 'id1').crossJoin(df.withColumnRenamed('id', 'id2')).show()