0

我有一些表格,我需要掩蓋它的一些列。要屏蔽的列因表而不同,我正在閱讀application.conf文件中的那些列。如何使用Spark 2遮罩列?

例如,對於員工表如下圖所示

+----+------+-----+---------+ 
| id | name | age | address | 
+----+------+-----+---------+ 
| 1 | abcd | 21 | India | 
+----+------+-----+---------+ 
| 2 | qazx | 42 | Germany | 
+----+------+-----+---------+ 

如果我們想掩蓋姓名和年齡列然後我的序列中獲得這些列。

val mask = Seq("name", "age") 

屏蔽之後的預期值是:

+----+----------------+----------------+---------+ 
| id | name   | age   | address | 
+----+----------------+----------------+---------+ 
| 1 | *** Masked *** | *** Masked *** | India | 
+----+----------------+----------------+---------+ 
| 2 | *** Masked *** | *** Masked *** | Germany | 
+----+----------------+----------------+---------+ 

如果我有職員表的數據幀,那麼什麼是掩蓋這些列的方式嗎?

如果我有payment表如下圖所示,要屏蔽namesalary列然後我得到面具列順序

+----+------+--------+----------+ 
| id | name | salary | tax_code | 
+----+------+--------+----------+ 
| 1 | abcd | 12345 | KT10  | 
+----+------+--------+----------+ 
| 2 | qazx | 98765 | AD12d | 
+----+------+--------+----------+ 

val mask = Seq("name", "salary") 

我想是這樣的mask.foreach(c => base.withColumn(c, regexp_replace(col(c), "^.*?$", "*** Masked ***")))但它並沒有返回任何東西。


感謝@philantrovert,我找到了解決方案。這裏是我使用的解決方案:

def maskData(base: DataFrame, maskColumns: Seq[String]) = { 
    val maskExpr = base.columns.map { col => if(maskColumns.contains(col)) s"'*** Masked ***' as ${col}" else col } 
    base.selectExpr(maskExpr: _*) 
} 

回答

1

你的陳述

mask.foreach(c => base.withColumn(c, regexp_replace(col(c), "^.*?$", "*** Masked ***"))) 

將返回List[org.apache.spark.sql.DataFrame]這聽起來不太好。

您可以使用selectExpr和使用產生的regexp_replace表達:

base.show 
+---+----+-----+-------+ 
| id|name| age|address| 
+---+----+-----+-------+ 
| 1|abcd|12345| KT10 | 
| 2|qazx|98765| AD12d| 
+---+----+-----+-------+ 

val mask = Seq("name", "age") 
val expr = df.columns.map { col => 
    if (mask.contains(col)) s"""regexp_replace(${col}, "^.*", "** Masked **") as ${col}""" 
    else col 
} 

這將產生與regex_replace對存在的序列mask

Array[String] = Array(id, regexp_replace(name, "^.*", "** Masked **") as name, regexp_replace(age, "^.*", "** Masked **") as age, address) 

現在你可以使用列的表達式selectExpr上生成的序列

base.selectExpr(expr: _*).show 

+---+------------+------------+-------+ 
| id|  name|   age|address| 
+---+------------+------------+-------+ 
| 1|** Masked **|** Masked **| KT10 | 
| 2|** Masked **|** Masked **| AD12d| 
+---+------------+------------+-------+ 
+0

謝謝。有效 – Shekhar

1

請檢查下面的代碼。關鍵是udf功能。

val df = ss.sparkContext.parallelize(Seq (
    ("c1", "JAN-2017", 49), 
    ("c1", "MAR-2017", 83), 
)).toDF("city", "month", "sales") 
df.show() 

val mask = udf((s : String) => { 
    "*** Masked ***" 
}) 

df.withColumn("city", mask($"city")).show` 
3

最簡單和最快的方法是使用withColumn,並簡單地用"*** Masked ***"覆蓋列中的值。用你的小例子,數據幀

val df = spark.sparkContext.parallelize(Seq (
    (1, "abcd", 12345, "KT10"), 
    (2, "qazx", 98765, "AD12d") 
)).toDF("id", "name", "salary", "tax_code") 

如果你有被屏蔽列的數量少,與已知的名字,那麼你可以簡單地做:

val mask = Seq("name", "salary") 

df.withColumn("name", lit("*** Masked ***")) 
    .withColumn("salary", lit("*** Masked ***")) 

否則,你需要創建一個循環:

var df2 = df 
for (col <- mask){ 
    df2 = df2.withColumn(col, lit("*** Masked ***")) 
} 

這兩種方法會給你這樣的結果:

+---+--------------+--------------+--------+ 
| id|   name|  salary|tax_code| 
+---+--------------+--------------+--------+ 
| 1|*** Masked ***|*** Masked ***| KT10| 
| 2|*** Masked ***|*** Masked ***| AD12d| 
+---+--------------+--------------+--------+