0
我的輸入csv數據,某些行包含重複的字段或一些缺少的字段,從這些數據我想從每行刪除重複的字段,然後所有的行應包含所有的字段,與值爲NULL的地方是不包含字段的地方。從輸入數據中刪除RDD中的重複字段
我的輸入csv數據,某些行包含重複的字段或一些缺少的字段,從這些數據我想從每行刪除重複的字段,然後所有的行應包含所有的字段,與值爲NULL的地方是不包含字段的地方。從輸入數據中刪除RDD中的重複字段
試試這個:
def transform(line):
"""
>>> s = 'id:111|name:dave|age:33|city:london'
>>> transform(s)
('id:111', {'age': '33', 'name': 'dave', 'city': 'london'})
"""
bits = line.split("|")
key = bits[0]
pairs = [v.split(":") for v in bits[1:]]
return key, {kv[0].strip(): kv[1].strip() for kv in pairs if len(kv) == 2}
rdd = (sc
.textFile("/tmp/sample")
.map(transform))
查找鍵:
from operator import attrgetter
keys = rdd.values().flatMap(lambda d: d.keys()).distinct().collect()
創建數據幀:
df = rdd.toDF(["id", "map"])
,拓展:
df.select(["id"] + [df["map"][k] for k in keys]).show()
所以我假設你已經從文本文件中獲得了rdd
。我創建了一個在這裏:
rdd = spark.sparkContext.parallelize([(u'id:111', u'name:dave', u'dept:marketing', u'age:33', u'city:london'),
(u'id:123', u'name:jhon', u'dept:hr', u'city:newyork'),
(u'id:100', u'name:peter', u'dept:marketing', u'name:peter', u'age:30', u'city:london'),
(u'id:222', u'name:smith', u'dept:finance', u'city:boston'),
(u'id:234', u'name:peter', u'dept:service', u'name:peter', u'dept:service', u'age:32', u'city:richmond')])
我只是使函數映射rdd
到key
和value
對,並同時刪除重複的一個
from pyspark.sql import Row
from pyspark.sql.types import *
def split_to_dict(l):
l = list(set(l)) # drop duplicate here
kv_list = []
for e in l:
k, v = e.split(':')
kv_list.append({'key': k, 'value': v})
return kv_list
rdd_map = rdd.flatMap(lambda l: split_to_dict(l)).map(lambda x: Row(**x))
df = rdd_map.toDF()
輸出前5行的例如
+----+---------+
| key| value|
+----+---------+
|city| london|
|dept|marketing|
|name| dave|
| age| 33|
| id| 111|
+----+---------+
你嘗試過什麼嗎?你有什麼具體問題,或者你只是在尋找某人向你展示解決方案? – Pushkr
@Pushkr,我試着這樣做,通過遍歷每一行並獲取基於Key的字段,如果該鍵重複再次忽略它,並且如果沒有任何關鍵字將其填充爲null。但我的數據集是巨大的,有時也可以連續30到40個重複的K:V對......所以遍歷所有的字段和行對性能而言是一個糟糕的設計......所以想要關於方法的建議。 – user491