2014-10-29 68 views
0

我想統計哪個用戶查看哪個類別的頻率。我是新手SparkPython。這裏是演示數據:火花python減鍵

dataSource = sc.parallelize([("user1", "film"), ("user1", "film"), ("user2", "film"), ("user2", "books"), ("user2", "books")]) 

我減少了關鍵用戶和收集所有類別。然後我分裂後數:

dataReduced = dataSource.reduceByKey(lambda x,y : x + "," + y) 
catSplitted = dataReduced.map(lambda (user,values) : (values.split(","),user)) 

每個用戶的輸出格式是這樣的 - >([CAT1,CAT1,CAT2,catn],用戶)

可能有人請告訴我怎麼算用SparkPython或者你有不同的方法來解決這個問題?

回答

0

純Python:

ds = [('user1',['film','film','books']), ('user2',['film','books','books'])] 
ds1 = map(lambda (x,y):(x,tuple(set((z,y.count(z)) for z in y))),ds) 
print ds1 

回報:

[('user1', (('film', 2), ('books', 1))), ('user2', (('film', 1), ('books', 2)))] 

火花,它應該是如下(不知道因爲我沒有獲得現在的火花):

dataReduced = dataSource.reduceByKey(lambda x,y : x + "," + y) 
catSplitted = dataReduced.map(lambda (user,values) : (user, values.split(",")) 
catCounted = catSplitted.map(lambda (x,y):(x,tuple(set((z,y.count(z)) for z in y))) 

希望這會有所幫助。如果沒有,您可以嘗試使用spark命令檢查如何獲取python功能。基本邏輯應該可以工作

+0

謝謝,這似乎工作。你能否簡單地解釋我的最後一個lambda函數。哪個變量表示哪個值 - lambda(x,y):(x,tuple(set((z,y.count(z))for z)y) – user3756702 2014-10-30 09:30:41

+0

我創建了一個元組的元組, ('電影','電影','電影','電影','書籍')成爲(('電影',2),('電影',1))。然後設置刪除重複,然後將設置轉換回元組。 – haraprasadj 2014-10-30 09:34:21

1

現在我已經得到了我期望的結果。但是,我猜想像我這樣連接鑰匙並不好。也許有人有另一種解決方案或任何建議?

# count the categorie views per user 
# data 
dataSource = sc.parallelize([("user1", "film"), ("user1", "film"), ("user2", "film"), ("user2", "books"), ("user2", "books")]) 
# Create Key,Value | concatenate user and category as key 
dataKeyValue = dataSource.map(lambda (user,category) : (user+","+category, 1)) 
# reduce 
dataReduced = dataKeyValue.reduceByKey(lambda x,y : x + y) 
# result => [('user2,books', 2), ('user1,film', 2), ('user2,film', 1)] 
# split key 
cleanResult = dataReduced.map(lambda (key,value) : (key.split(","),value)) 
0

另一個(更高效和易讀的IMO)。 由於您的SPARK DAG不需要在基於用戶的分區後收集重新分區類別,並且易於用戶使用,因爲它使用數據框而不是RDD。

首先,基於用戶和類別的哈希列:

import pyspark.sql.functions as F 
df = spark.createDataFrame([("u1", "f"), ("u1", "f"), ("u2", "f"), ("u2", "b"), ("u2", "b")], schema=['u', 'c']) 
df = df.withColumn('hash', f.hash()) 

其次,我們通過散列分區,並在本地彙總:

from pyspark.sql import Window 
win = Window.partitionBy('hash') 
df.withColumns('count', F.count('hash').over(win)).distinct().show()