2017-04-25 26 views
1

這造成我的示例數據幀:火花數據幀更新列,而其他科拉姆是像pyspark

df = sc.parallelize([('abc',),('def',)]).toDF() #(
df = df.selectExpr("_1 as one",) 
df = df.withColumn("two", lit('z')) 
df.show() 

看起來像這樣:

+---+---+ 
|one|two| 
+---+---+ 
|abc| z| 
|def| z| 
+---+---+ 

現在我想要做的是一系列SQL where like聲明其中列two被追加,無論它是否匹配

在「僞代碼」中,它看起來像這樣:

for letter in ['a','b','c','d']: 
    df = df['two'].where(col('one').like("%{}%".format(letter))) += letter 

最終導致DF看起來像這樣:

+---+----+ 
|one| two| 
+---+----+ 
|abc|zabc| 
|def| zd| 
+---+----+ 

回答

4

如果您使用的字符串列表子集的字符串列,可以最好地利用broadcast變量。讓我們先從一個更現實的例子在您的字符串仍然包含空格:

df = sc.parallelize([('a b c',),('d e f',)]).toDF() 
df = df.selectExpr("_1 as one",) 
df = df.withColumn("two", lit('z')) 

然後我們創建的信件列表的broadcast變量,因此定義使用它們子集的字符串列表的udf;最後連接它們與另一列中的值,返回一個字符串:

letters = ['a','b','c','d'] 
letters_bd = sc.broadcast(letters) 

def subs(col1, col2): 

    l_subset = [x for x in col1 if x in letters_bd.value] 
    return col2 + ' ' + ' '.join(l_subset) 

subs_udf = udf(subs) 

要應用上面,我們進行子集需要的字符串轉換爲一個列表,所以我們使用功能split(),然後再申請我們udf

from pyspark.sql.functions import col, split 

df.withColumn("three", split(col('one'), r'\W+')) \ 
    .withColumn("three", subs_udf("three", "two")) \ 
    .show() 
+-----+---+-------+ 
| one|two| three| 
+-----+---+-------+ 
|a b c| z|z a b c| 
|d e f| z| z d| 
+-----+---+-------+ 

或者不udf,使用regexp_replaceconcat如果你的信可以舒適地融入regex表達。

from pyspark.sql.functions import regexp_replace, col, concat, lit 

df.withColumn("three", concat(col('two'), lit(' '), 
       regexp_replace(col('one'), '[^abcd]', ' '))) 
+0

'expr'用'regexp_replace'可能是一個更好的選擇性能明智的。在2.1+中,您可以使用[''regexp_replace' alone](https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/ functions.scala#L2324) – zero323

+0

謝謝@ zero323! – mtoto

+0

是有效的答案,但對於我正在處理的應用程序來說,這是不切實際的。對不起,我沒有指定。我收到了超過1000萬個文件(在'one'中)和一個嵌套的「詞典」,其中有超過500萬個關鍵詞,我想匹配它們,並將結果放在'two'中。正則表達式是我的第一種方法,但「喜歡」和「進入」的速度更快,所以我試圖找到一種解決方案。 – Thagor