2016-11-14 17 views
0

我有json文件,我試圖用SHA 256對它的一個字段進行哈希處理。這些文件位於AWS S3上。我目前在Apache Zeppelin上使用python進行Spark。使用json模式更新spark數據框中的列

這是我的json架構,我試圖散列'mac'字段;

|-- Document: struct (nullable = true) 
| |-- data: array (nullable = true) 
| | |-- element: struct (containsNull = true) 
| | | |-- mac: string (nullable = true) 

我試過幾件事;

from pyspark.sql.functions import UserDefinedFunction 
from pyspark.sql.types import StringType 
import hashlib 

hcData = sqlc.read.option("inferSchema","true").json(inputPath) 
hcData.registerTempTable("hcData") 


name = 'Document' 
udf = UserDefinedFunction(lambda x: hashlib.sha256(str(x).encode('utf-8')).hexdigest(), StringType()) 
new_df = hcData.select(*[udf(column).alias(name) if column == name else column for column in hcData.columns]) 

此代碼正常工作。但是,當我嘗試散列mac字段和更改名稱變量什麼也沒有發生;

name = 'Document.data[0].mac' 
name = 'mac' 

我想這是因爲它無法找到給定名稱的列。

我試着改變一下代碼;

def valueToCategory(value): 
    return hashlib.sha256(str(value).encode('utf-8')).hexdigest() 


udfValueToCategory = udf(valueToCategory, StringType()) 
df = hcData.withColumn("Document.data[0].mac",udfValueToCategory("Document.data.mac")) 

這個散列代碼 「Document.data.mac」 和與散列MAC地址創建新列。我想更新現有的列。對於那些沒有嵌套的變量可以更新,沒有問題,但對於嵌套變量,我找不到更新的方法。

所以基本上,我想用spark python散列嵌套的json文件中的字段。任何人都可以知道如何更新火花數據框與模式

回答

0

這裏是我下面的問題蟒蛇解決方案。

from pyspark.sql.functions import UserDefinedFunction 
from pyspark.sql.types import StringType 
import hashlib 
import re 


def find(s, r): 
    l = re.findall(r, s) 
    if(len(l)!=0): 
     return l 
    else: 
     lis = ["null"] 
     return lis 



def hash(s): 
    return hashlib.sha256(str(s).encode('utf-8')).hexdigest() 



def hashAll(s, r): 
    st = s 
    macs = re.findall(r, s) 
    for mac in macs: 
     st = st.replace(mac, hash(mac)) 
    return st 


rdd = sc.textFile(inputPath) 

regex = "([0-9A-Z]{1,2}[:-]){5}([0-9A-Z]{1,2})" 
hashed_rdd = rdd.map(lambda line: hashAll(line, regex)) 

hashed_rdd.saveAsTextFile(outputPath) 
0

那麼,我發現我的問題的解決方案與斯卡拉。可以有多餘的代碼,但它仍然有效。

import scala.util.matching.Regex 
import java.security.MessageDigest 

val inputPath = "" 
val outputPath = "" 

//finds mac addresses with given regex 
def find(s: String, r: Regex): List[String] = { 
    val l = r.findAllIn(s).toList 
    if(!l.isEmpty){ 
     return l 
    } else { 
     val lis: List[String] = List("null") 
     return lis 
    } 
} 

//hashes given string with sha256 
def hash(s: String): String = { 
    return MessageDigest.getInstance("SHA-256").digest(s.getBytes).map(0xFF & _).map { "%02x".format(_) }.foldLeft(""){_ + _} 
} 

//hashes given line 
def hashAll(s: String, r:Regex): String = { 
    var st = s 
    val macs = find(s, r) 
    for (mac <- macs){ 
     st = st.replaceAll(mac, hash(mac)) 
    } 
    return st 
} 

//read data 
val rdd = sc.textFile(inputPath) 

//mac address regular expression 
val regex = "(([0-9A-Z]{1,2}[:-]){5}([0-9A-Z]{1,2}))".r 

//hash data 
val hashed_rdd = rdd.map(line => hashAll(line, regex)) 

//write hashed data 
hashed_rdd.saveAsTextFile(outputPath) 
相關問題