2017-10-15 140 views
0

我有一個數據集,其中包含工人與他們的人口統計信息,如年齡性別,地址等及其工作地點。我從數據集創建了一個RDD並將其轉換爲DataFrame。計算pyspark中數據框的所有行之間的餘弦相似度

每個ID有多個條目。因此,我創建了一個DataFrame,其中只包含工作人員的ID和他/她工作的各個辦公地點。

|----------|----------------| 
    | **ID** **Office_Loc** | 
    |----------|----------------| 
    | 1  |Delhi, Mumbai, | 
    |   | Gandhinagar | 
    |---------------------------| 
    | 2  | Delhi, Mandi | 
    |---------------------------| 
    | 3  |Hyderbad, Jaipur| 
    ----------------------------- 

我想根據他們的辦公地點計算每個工人與其他工人之間的餘弦相似度。

所以,我通過數據幀的行迭代中,從數據幀中檢索的單個行:

myIndex = 1 
values = (ID_place_df.rdd.zipWithIndex() 
      .filter(lambda ((l, v), i): i == myIndex) 
      .map(lambda ((l,v), i): (l, v)) 
      .collect()) 

,然後使用地圖

cos_weight = ID_place_df.select("ID","office_location").rdd\ 
    .map(lambda x: get_cosine(values,x[0],x[1])) 

到計算的餘弦相似性所提取的行之間和整個DataFrame。

我不認爲我的方法是一個很好的方法,因爲我遍歷DataFrame的行,它打敗了使用spark的全部目的。 在pyspark有更好的方法嗎? 請提醒。

+0

我想了一會兒問題。通常最好的做法是用最簡單的案例來問問你是否得到同樣的問題。 – ChaosPredictor

回答

1

您可以使用mllib包來計算每行TF-IDF的L2範數。然後乘以表本身由兩個L2規範,以獲得餘弦相似性的兩個點積:

1 RDD

rdd = sc.parallelize([[1, "Delhi, Mumbai, Gandhinagar"],[2, " Delhi, Mandi"], [3, "Hyderbad, Jaipur"]]) 
  • 計算TF-IDF

    documents = rdd.map(lambda l: l[1].replace(" ", "").split(",")) 
    
    from pyspark.mllib.feature import HashingTF, IDF 
    hashingTF = HashingTF() 
    tf = hashingTF.transform(documents) 
    

Y您可以指定HashingTF中的特徵數量以使特徵矩陣更小(更少的列)。

tf.cache() 
    idf = IDF().fit(tf) 
    tfidf = idf.transform(tf) 
  • 計算L2規範:

    from pyspark.mllib.linalg.distributed import IndexedRowMatrix 
    mat = IndexedRowMatrix(data).toBlockMatrix() 
    dot = mat.multiply(mat.transpose()) 
    dot.toLocalMatrix().toArray() 
    
        array([[ 0.  , 0.  , 0.  , 0.  ], 
          [ 0.  , 1.  , 0.10794634, 0.  ], 
          [ 0.  , 0.10794634, 1.  , 0.  ], 
          [ 0.  , 0.  , 0.  , 1.  ]]) 
    

    OR::使用笛卡爾積和功能

    from pyspark.mllib.feature import Normalizer 
    labels = rdd.map(lambda l: l[0]) 
    features = tfidf 
    
    normalizer = Normalizer() 
    data = labels.zip(normalizer.transform(features)) 
    
  • 計算通過使基質與自身相乘的餘弦相似度dot在numpy陣列:

    data.cartesian(data)\ 
        .map(lambda l: ((l[0][0], l[1][0]), l[0][1].dot(l[1][1])))\ 
        .sortByKey()\ 
        .collect() 
    
        [((1, 1), 1.0), 
        ((1, 2), 0.10794633570596117), 
        ((1, 3), 0.0), 
        ((2, 1), 0.10794633570596117), 
        ((2, 2), 1.0), 
        ((2, 3), 0.0), 
        ((3, 1), 0.0), 
        ((3, 2), 0.0), 
        ((3, 3), 1.0)] 
    

2。據幀

既然你似乎可以用dataframes,你可以使用spark ml包代替:

import pyspark.sql.functions as psf 
df = rdd.toDF(["ID", "Office_Loc"])\ 
    .withColumn("Office_Loc", psf.split(psf.regexp_replace("Office_Loc", " ", ""), ',')) 
  • 計算TF-IDF:

    from pyspark.ml.feature import HashingTF, IDF 
    hashingTF = HashingTF(inputCol="Office_Loc", outputCol="tf") 
    tf = hashingTF.transform(df) 
    
    idf = IDF(inputCol="tf", outputCol="feature").fit(tf) 
    tfidf = idf.transform(tf) 
    
  • 計算L2規範:

    from pyspark.ml.feature import Normalizer 
    normalizer = Normalizer(inputCol="feature", outputCol="norm") 
    data = normalizer.transform(tfidf) 
    
  • 計算矩陣乘積:

    from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix 
    mat = IndexedRowMatrix(
        data.select("ID", "norm")\ 
         .rdd.map(lambda row: IndexedRow(row.ID, row.norm.toArray()))).toBlockMatrix() 
    dot = mat.multiply(mat.transpose()) 
    dot.toLocalMatrix().toArray() 
    

    OR:使用連接和用於功能dot一個UDF

    dot_udf = psf.udf(lambda x,y: float(x.dot(y)), DoubleType()) 
    data.alias("i").join(data.alias("j"), psf.col("i.ID") < psf.col("j.ID"))\ 
        .select(
         psf.col("i.ID").alias("i"), 
         psf.col("j.ID").alias("j"), 
         dot_udf("i.norm", "j.norm").alias("dot"))\ 
        .sort("i", "j")\ 
        .show() 
    
        +---+---+-------------------+ 
        | i| j|    dot| 
        +---+---+-------------------+ 
        | 1| 2|0.10794633570596117| 
        | 1| 3|    0.0| 
        | 2| 3|    0.0| 
        +---+---+-------------------+ 
    

本教程列出不同的方法來繁殖大型矩陣: https://labs.yodas.com/large-scale-matrix-multiplication-with-pyspark-or-how-to-match-two-large-datasets-of-company-1be4b1b2871e

+0

謝謝你的回答。我非常感謝幫助。但是代碼給了我一個錯誤'要求失敗:輸入列必須是ArrayType,但是得到了StringType.''。在使用數據幀時進行hashingTF轉換期間。 –

+0

您必須首先將字符串列表拆分爲單詞列表。我添加了關於如何創建'df' – MaFF

+0

的部分嗨,它在我使用'data.cartesian(data)\ .map(lambda l:((l [0] [0],l [1] [0 ]),l [0] [1] .dot(l [1] [1])))\ .sortByKey()\ 。take(5)'。但是當我使用mllib代碼並將blockMatrix轉換爲LocalMatrix時,它給了'u'requirement失敗:值數組的長度必須小於Int.MaxValue。目前numRows * numCols:1006095879729669481''我不明白,因爲我正在採取一小部分數據(約10個ID),所以numRows * numCols:100。 –