2017-06-14 322 views
2

我有一個DF,其「產品」列中列出象下面這樣:如何在dataframe spark的一列中獲取列表的長度?

+----------+---------+--------------------+ 
|member_srl|click_day|   products| 
+----------+---------+--------------------+ 
|  12| 20161223| [2407, 5400021771]| 
|  12| 20161226|  [7320, 2407]| 
|  12| 20170104|    [2407]| 
|  12| 20170106|    [2407]| 
|  27| 20170104|  [2405, 2407]| 
|  28| 20161212|    [2407]| 
|  28| 20161213|  [2407, 100093]| 
|  28| 20161215|   [1956119]| 
|  28| 20161219|  [2407, 100093]| 
|  28| 20161229|   [7905970]| 
|  124| 20161011|  [5400021771]| 
|  6963| 20160101|   [103825645]| 
|  6963| 20160104|[3000014912, 6626...| 
|  6963| 20160111|[99643224, 106032...| 

如何添加一個新列product_cnt這是products列表的長度?以及如何過濾df以獲得具有給定產品長度條件的指定行? 謝謝。

回答

0

第一個問題

如何添加一個新列product_cnt它們的產品列表的長度?

>>> a = [(12,20161223, [2407,5400021771]),(12,20161226,[7320,2407])] 
>>> df = spark.createDataFrame(a, 
["member_srl","click_day","products"]) 
>>> df.show() 
+----------+---------+------------------+ 
|member_srl|click_day|   products| 
+----------+---------+------------------+ 
|  12| 20161223|[2407, 5400021771]| 
|  12| 20161226|[7320, 2407, 4344]| 
+----------+---------+------------------+ 

你可以找到一個類似的例子here

>>> from pyspark.sql.types import IntegerType 
>>> from pyspark.sql.functions import udf 

>>> slen = udf(lambda s: len(s), IntegerType()) 

>>> df2 = df.withColumn("product_cnt", slen(df.products)) 
>>> df2.show() 
+----------+---------+------------------+-----------+ 
|member_srl|click_day|   products|product_cnt| 
+----------+---------+------------------+-----------+ 
|  12| 20161223|[2407, 5400021771]|   2| 
|  12| 20161226|[7320, 2407, 4344]|   3| 
+----------+---------+------------------+-----------+ 

第二個問題:給定產品長度的條件

以及如何過濾DF獲得指定行?

您可以使用過濾功能docs here

>>> givenLength = 2 
>>> df3 = df2.filter(df2.product_cnt==givenLength) 
>>> df3.show() 
+----------+---------+------------------+-----------+ 
|member_srl|click_day|   products|product_cnt| 
+----------+---------+------------------+-----------+ 
|  12| 20161223|[2407, 5400021771]|   2| 
+----------+---------+------------------+-----------+ 
3

Pyspark有一個內置的功能來實現你想要什麼叫sizehttp://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.size。 要將其添加爲列,您可以在選擇語句期間簡單地調用它。

from pyspark.sql.functions import size 

countdf = df.select('*',size('products').alias('product_cnt')) 

過濾工作完全按照@ titiro89描述。此外,您可以在過濾器中使用size函數。這將允許您以下面的方式繞過添加額外的列(如果您希望這麼做)。

filterdf = df.filter(size('products')==given_products_length) 
相關問題