2017-08-02 22 views
1

我正在構建一個接受「key」=「value」對的原始文本文件並使用PySpark寫入表格/ .csv結構的解析器。將Key/Value對的Pyspark RDD解析爲.csv格式

我在哪裏卡住的是,我可以訪問他們的鍵和值的函數中構建各csv_row,甚至檢查按鍵等於預期鍵(col_list)的列表,但我打電話內的功能processCsv一個拉姆達,我不知道如何將每個csv_row附加到列表l_of_l的全局列表,該列表旨在保存.csv行的最終列表。

如何在鍵/值格式中迭代RDD的每條記錄並解析爲.csv格式?正如你所看到的,我最終的清單清單(l_of_l)是空的,但我可以得到循環內的每一行...令人沮喪。

所有建議讚賞!

原始文本結構(foo.log):

"A"="foo","B"="bar","C"="baz" 
"A"="oof","B"="rab","C"="zab" 
"A"="aaa","B"="bbb","C"="zzz" 

的方法,使遠:

from pyspark import SparkContext 
from pyspark import SQLContext 
from pyspark.sql import Row 

sc=SparkContext('local','foobar') 
sql = SQLContext(sc) 

# Read raw text to RDD 
lines=sc.textFile('foo.log') 
records=lines.map(lambda x: x.replace('"', '').split(",")) 

print 'Records pre-transform:\n' 
print records.take(100) 
print '------------------------------\n' 

def processRecord(record, col_list):  
    csv_row=[] 
    for idx, val in enumerate(record): 
     key, value = val.split('=')   
     if(key==col_list[idx]): 
      # print 'Col name match' 
      # print value 
      csv_row.append(value) 
     else: 
      csv_row.append(None) 
      print 'Key-to-Column Mismatch, dropping value.' 
    print csv_row 
    global l_of_l 
    l_of_l.append(csv_row) 

l_of_l=[] 
colList=['A', 'B', 'C'] 
records.foreach(lambda x: processRecord(x, col_list=colList)) 

print 'Final list of lists:\n' 
print l_of_l 

輸出:

Records pre-transform: 
[[u'A=foo', u'B=bar', u'C=baz'], [u'A=oof', u'B=rab', u'C=zab'], [u'A=aaa', u'B=bbb', u'C=zzz']] 
------------------------------ 

[u'foo', u'bar', u'baz'] 
[u'oof', u'rab', u'zab'] 
[u'aaa', u'bbb', u'zzz'] 

Final list of lists: 
[] 
+0

你能舉一個例子,說明'l_of_l' * *應該是,即什麼是確切的期望的輸出? – desertnaut

+0

當然@desertnaut,感謝您的期待。我希望'print l_of_l'得到: '[''foo','bar','baz'],['oof','rab','zab'],['aaa','bbb ','zzz']]' – gpanda

+0

Upvoted由於良好和乾淨的問題的說明 - 所以很少現在在SO,尤其是從新用戶... – desertnaut

回答

1

試試這個功能:

def processRecord(record, col_list):  
    csv_row=list() 
    for idx, val in enumerate(record): 
     key, value = val.split('=')   
     if(key==col_list[idx]): 
      # print 'Col name match' 
      # print value 
      csv_row.append(value) 
     else: 
      csv_row.append(None) 
      # print 'Key-to-Column Mismatch, dropping value.' 
    return csv_row 

然後

colList=['A', 'B', 'C'] 
l_of_l = records.map(lambda x: processRecord(x, col_list=colList)).collect() 

print 'Final list of lists:\n' 
print l_of_l 

應該給

Final list of lists: 
[[u'foo', u'bar', u'baz'], [u'oof', u'rab', u'zab'], [u'aaa', u'bbb', u'zzz']] 
+0

高手!非常感謝你,我從來沒有想過將l_of_l設置爲records.map * 顯然我有很多東西需要學習 - PySpark的整個匿名函數方面對我來說是一個挑戰,你能推薦任何好的學習資料/來源? (再次感謝!:)) – gpanda

+0

非常歡迎。不幸的是,除了文檔,我沒有意識到PySpark有什麼好的來源...... – desertnaut

+1

嗯,我想這就像任何新東西,通過沖擊它學習,並在朋友的親切幫助:)。一旦我有了更好的把握,希望能在這裏貢獻自己的力量 - 第一週使用PySpark。 – gpanda