2016-03-28 56 views
0

我嘗試瞭解RDD如何工作。例如,我想根據某些RDD對象中的上下文來計算行數。我有一些的遭遇與DataFrames和我的DF碼,裏面有例如列AB,可能有一些其他列,正在尋找這樣的:計算rdd中的行取決於行上下文,pyspark

df = sqlContext.read.json("filepath") 
df2 = df.groupBy(['A', 'B']).count() 

這段代碼的邏輯部分是清楚的,我 - 我做的對DF中的列名稱操作groupBy。在RDD中,我沒有一些列名,只是類似的行,它可能是一個元組或行對象...我如何計算相似的元組並將其作爲整數添加到唯一行?例如我的第一個代碼是:

df = sqlContext.read.json("filepath") 
rddob = df.rdd.map(lambda line:(line.A, line.B)) 

我做的地圖操作和按鍵AB創造價值的一個元組。獨特的行不再有任何鍵(這是與DataFrame,它有列名稱最重要的區別)。 現在我可以生成這樣的東西,但它只計算RDD中的總數。

rddcalc = rddob.distinct().count() 

我要爲我的輸出是什麼,就是:

((a1, b1), 2) 
((a2, b2), 3) 
((a2, b3), 1) 
... 

PS

我發現我對這個問題的個人解決方案。這裏:rdd是初始rdd,rddlist是所有行的列表,rddmod是最終修改的rdd,因此也是解決方案。

rddlist = rdd.map(lambda line:(line.A, line.B)).map(lambda line: (line, 1)).countByKey().items() 
rddmod = sc.parallelize(rddlist) 
+0

實際上'groupBy'不推薦使用,因爲它需要對分區進行混洗,因此會在所有節點間移動很多數據。 –

+0

@Alberto Bonsanto,感謝您對本主題的興趣。我不認爲這組對於DF和RDD是危險的,它不存在。 – Guforu

+0

好吧,你可以在這裏找到* databricks *解釋的一些原因[優先reduceByKey而不是groupByKey](https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html) –

回答

1

我相信你在找什麼是reduceByKey。這會給你計算每條不同的(a,b)行出現的次數。 它應該是這樣的:

rddob = df.rdd.map(lambda line: (line.A + line.B, 1)) 
counts_by_key = rddob.reduceByKey(lambda a,b: a+b) 

現在,您將有重點,形式的值對: ((a,b), count-of-times-pair-appears)

請注意,這僅適用如果A和B都是字符串。如果它們是列表,則必須創建一個「主鍵」類型的對象來執行reduce。主鍵是一些複雜的對象時,您不能執行reduceByKey

+0

的幫助下,感謝您的回覆。其實我已經找到了解決方案,但沒有在這裏公開。您幾乎是正確的,我創建了鍵(A和B值的唯一組合),但將countByKey用作最終功能。無論如何,非常感謝您 – Guforu

+0

您應該發佈您的解決方案,以便其他人可以使用它。 – PinoSan

+0

@PinoSan,好吧,我已經更新了我的第一篇文章 – Guforu