2017-04-13 105 views
0

我的輸入csv數據,某些行包含重複的字段或一些缺少的字段,從這些數據我想從每行刪除重複的字段,然後所有的行應包含所有的字段,與值爲NULL的地方是不包含字段的地方。從輸入數據中刪除RDD中的重複字段

+0

你嘗試過什麼嗎?你有什麼具體問題,或者你只是在尋找某人向你展示解決方案? – Pushkr

+0

@Pushkr,我試着這樣做,通過遍歷每一行並獲取基於Key的字段,如果該鍵重複再次忽略它,並且如果沒有任何關鍵字將其填充爲null。但我的數據集是巨大的,有時也可以連續30到40個重複的K:V對......所以遍歷所有的字段和行對性能而言是一個糟糕的設計......所以想要關於方法的建議。 – user491

回答

0

試試這個:

def transform(line): 
    """ 
    >>> s = 'id:111|name:dave|age:33|city:london' 
    >>> transform(s) 
    ('id:111', {'age': '33', 'name': 'dave', 'city': 'london'}) 
    """ 
    bits = line.split("|") 
    key = bits[0] 
    pairs = [v.split(":") for v in bits[1:]] 
    return key, {kv[0].strip(): kv[1].strip() for kv in pairs if len(kv) == 2} 

rdd = (sc 
    .textFile("/tmp/sample") 
    .map(transform)) 

查找鍵:

from operator import attrgetter 

keys = rdd.values().flatMap(lambda d: d.keys()).distinct().collect() 

創建數據幀:

df = rdd.toDF(["id", "map"]) 

,拓展:

df.select(["id"] + [df["map"][k] for k in keys]).show() 
0

所以我假設你已經從文本文件中獲得了rdd。我創建了一個在這裏:

rdd = spark.sparkContext.parallelize([(u'id:111', u'name:dave', u'dept:marketing', u'age:33', u'city:london'), 
(u'id:123', u'name:jhon', u'dept:hr', u'city:newyork'), 
(u'id:100', u'name:peter', u'dept:marketing', u'name:peter', u'age:30', u'city:london'), 
(u'id:222', u'name:smith', u'dept:finance', u'city:boston'), 
(u'id:234', u'name:peter', u'dept:service', u'name:peter', u'dept:service', u'age:32', u'city:richmond')]) 

我只是使函數映射rddkeyvalue對,並同時刪除重複的一個

from pyspark.sql import Row 
from pyspark.sql.types import * 

def split_to_dict(l): 
    l = list(set(l)) # drop duplicate here 
    kv_list = [] 
    for e in l: 
     k, v = e.split(':') 
     kv_list.append({'key': k, 'value': v}) 
    return kv_list 

rdd_map = rdd.flatMap(lambda l: split_to_dict(l)).map(lambda x: Row(**x)) 
df = rdd_map.toDF() 

輸出前5行的例如

+----+---------+ 
| key| value| 
+----+---------+ 
|city| london| 
|dept|marketing| 
|name|  dave| 
| age|  33| 
| id|  111| 
+----+---------+