2017-06-13 80 views
0

我有兩個dataframes這樣的:添加隨機樣本來自一個火花數據幀到另一個

| User | 
------ 
| 1 | 
| 2 | 
| 3 | 

| Articles | 
---------- 
| 'A'  | 
| 'B'  | 
| 'C'  | 

什麼是每一個用戶2 articles隨機分配一個直觀的方式? 輸出數據幀可能是這樣的:

| User | Articles | 
----------------- 
| 1 | 'A'  | 
| 1 | 'C'  | 
| 2 | 'C'  | 
| 2 | 'B'  | 
| 3 | 'C'  | 
| 3 | 'A'  | 

這裏,將產生這兩個dataframes代碼:

u =[(1,), (2,), (3,)] 
rdd = sc.parallelize(u) 
users = rdd.map(lambda x: Row(user_id=x[0])) 
users_df = sqlContext.createDataFrame(users) 

a = [('A',), ('B',), ('C',), ('D',), ('E',)] 
rdd = sc.parallelize(a) 
articles = rdd.map(lambda x: Row(article_id=x[0])) 
articles_df = sqlContext.createDataFrame(articles) 
+0

文章數據框很大嗎? – Psidom

+0

它很小 - 有數百行。用戶大約在20萬行左右 – tchoedak

回答

0

因爲你的文章列表很小是有意義的保持它作爲一個Python對象,不作爲分佈式列表。這將允許您創建一個udf,爲每個user_id生成一個隨機的文章列表。以下是你可以做這樣一個辦法:

from random import sample,seed 
from pyspark.sql import Row 
from pyspark.sql.functions import udf,explode 
from pyspark.sql.types import ArrayType,StringType 

class ArticleRandomizer(object): 
    def __init__(self,article_list,num_articles=2,preseed=0): 
     self.article_list=article_list 
     self.num_articles=num_articles 
     self.preseed=preseed 
    def getrandom(self,user): 
     seed(user+self.preseed) 
     return sample(self.article_list,self.num_articles) 

u =[(1,), (2,), (3,)] 
rdd = sc.parallelize(u) 
users = rdd.map(lambda x: Row(user_id=x[0])) 
users_df = sqlContext.createDataFrame(users) 

a = [('A',), ('B',), ('C',), ('D',), ('E',)] 
#rdd = sc.parallelize(a) 
#articles = rdd.map(lambda x: Row(article_id=x[0])) 
#articles_df = sqlContext.createDataFrame(articles) 

article_list=[article[0] for article in a] 
ARandomizer=ArticleRandomizer(article_list) 
add_articles=udf(ARandomizer.getrandom,ArrayType(StringType())) 
users_df.select('user_id',explode(add_articles('user_id'))).show() 

ArticleRandomizer.getrandom功能由user_id播種,所以它是確定性的,這意味着你會得到的物品相同的隨機列表每次運行一個給定的用戶。您可以通過在實例化類時更改preseed值來調整此值,以獲得潛在的不同列表。

這還沒有經過測試,看它是否能夠很好地擴展,但它應該適用於您的數據集,因爲文章和用戶的維度都很小。

相關問題