我正在構建一個接受「key」=「value」對的原始文本文件並使用PySpark寫入表格/ .csv結構的解析器。將Key/Value對的Pyspark RDD解析爲.csv格式
我在哪裏卡住的是,我可以訪問他們的鍵和值的函數中構建各csv_row
,甚至檢查按鍵等於預期鍵(col_list
)的列表,但我打電話內的功能processCsv
一個拉姆達,我不知道如何將每個csv_row
附加到列表l_of_l
的全局列表,該列表旨在保存.csv行的最終列表。
如何在鍵/值格式中迭代RDD的每條記錄並解析爲.csv格式?正如你所看到的,我最終的清單清單(l_of_l
)是空的,但我可以得到循環內的每一行...令人沮喪。
所有建議讚賞!
原始文本結構(foo.log):
"A"="foo","B"="bar","C"="baz"
"A"="oof","B"="rab","C"="zab"
"A"="aaa","B"="bbb","C"="zzz"
的方法,使遠:
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql import Row
sc=SparkContext('local','foobar')
sql = SQLContext(sc)
# Read raw text to RDD
lines=sc.textFile('foo.log')
records=lines.map(lambda x: x.replace('"', '').split(","))
print 'Records pre-transform:\n'
print records.take(100)
print '------------------------------\n'
def processRecord(record, col_list):
csv_row=[]
for idx, val in enumerate(record):
key, value = val.split('=')
if(key==col_list[idx]):
# print 'Col name match'
# print value
csv_row.append(value)
else:
csv_row.append(None)
print 'Key-to-Column Mismatch, dropping value.'
print csv_row
global l_of_l
l_of_l.append(csv_row)
l_of_l=[]
colList=['A', 'B', 'C']
records.foreach(lambda x: processRecord(x, col_list=colList))
print 'Final list of lists:\n'
print l_of_l
輸出:
Records pre-transform:
[[u'A=foo', u'B=bar', u'C=baz'], [u'A=oof', u'B=rab', u'C=zab'], [u'A=aaa', u'B=bbb', u'C=zzz']]
------------------------------
[u'foo', u'bar', u'baz']
[u'oof', u'rab', u'zab']
[u'aaa', u'bbb', u'zzz']
Final list of lists:
[]
你能舉一個例子,說明'l_of_l' * *應該是,即什麼是確切的期望的輸出? – desertnaut
當然@desertnaut,感謝您的期待。我希望'print l_of_l'得到: '[''foo','bar','baz'],['oof','rab','zab'],['aaa','bbb ','zzz']]' – gpanda
Upvoted由於良好和乾淨的問題的說明 - 所以很少現在在SO,尤其是從新用戶... – desertnaut