2016-04-17 106 views
1

我試圖在PySpark中運行作業。我的數據是在RDD使用的PySpark火花上下文類(SC)創建如下:在PySpark列表中有條件地拆分逗號分隔值

directory_file = sc.textFile('directory.csv')

*我不認爲Python的CSV模塊上的RDD內的數據被使用。

這會爲csv中的每一行生成一個列表。我知道這是很厲害的,但這裏有一個列表的樣品(從原來的CSV等同於行):

[u'14K685,El Puente Academy for Peace and Justice,Brooklyn,K778,718-387-1125,718-387-4229,9,12,,,"B24, B39, B44, B44-SBS, B46, B48, B57, B60, B62, Q54, Q59","G to Broadway ; J, M to Hewes St ; Z to Marcy Ave",250 Hooper Street,Brooklyn,NY,11211,www.elpuente.us,225,N/A,Consortium School,"We are a small, innovative learning community that promotes comprehensive academic excellence for all students while inspiring and nurturing leadership for peace and justice. Our state-of-the-art facility allows for a creative and intellectually challenging environment where every student thrives. Our project-based curriculum is designed to prepare students to be active citizens and independent thinkers who share a passion for transforming their communities and the world into a better place. Our trimester system allows students to complete most of their high school credits by the 11th grade, opening opportunities for exciting internships and college courses during the school day in their senior year.","Accelerated credit accumulation (up to 18 credits per year), iLearn, iZone 360, Year-long SAT (Scholastic Aptitude Test) preparatory course, Individualized college counseling, Early College Awareness & Preparatory Program (ECAPP). Visits to college campuses in NYC, Visits to colleges outside NYC in partnership with the El Puente Leadership Center, Internships, Community-based Projects, Portfolio Assessment, Integrated-Arts Projects, Before- and After-school Tutoring; Elective courses include: Drama, Dance (Men\'s and Women\'s Groups), Debate Team partnership with Tufts University, Guitar, Filmmaking, Architecture, Glee",Spanish,,,,"AM and PM Academic Support, B-Boy/B-Girl, Chorus, College and Vocational Counseling and Placement, College Prep, Community Development Project, Computers, Dance Level 1 and 2, Individual Drama; Education for Public Inquiry and International Citizenship (EPIIC), El Puente Leadership Center, Film, Fine Arts, Liberation, Media, Men\u2019s and Women\u2019s Groups, Movement Theater Level 1, Movement Theater Level 2, Music, Music Production, Pre-professional training in Dance, PSAT/SAT Prep, Spoken Word, Student Council, Teatro El Puente, Visual Art",,,,"Boys & Girls Basketball, Baseball, Softball, Volleyball",El Puente Williamsburg Leadership Center; The El Puente Bushwick Center; Leadership Center at Taylor-Wythe Houses; Beacon Leadership Center at MS50.,"Woodhull Medical Center, Governor Hospital","Hunter College (CUNY), Eugene Lang College The New School for Liberal Arts, Pratt College of Design, Tufts University, and Touro College.","El Puente Leadership Center, El Puente Bushwick Center, Beacon Leadership Center at MS50, Leadership Center at Taylor-Wythe Houses, Center for Puerto Rican Studies, Hip- Hop Theatre Festival, Urban Word, and Summer Search.",,,,,Our school requires assessment of an Academic Portfolio for graduation.,,9:00 AM,3:30 PM,This school will provide students with disabilities the supports and services indicated on their IEPs.,ESL,Not Functionally Accessible,1,Priority to Brooklyn students or residents,Then to New York City residents,,,,,,,,,"250 Hooper Street'] 

我想用逗號作爲分隔符分割每個項目除了當逗號之間雙引號(例如「,,,」)。

parsed = directory_file.map(lambda x: x.split(','))顯然不能解決雙引號之間的逗號。有沒有辦法做到這一點?我已經看過這個問題,特別提到了csv,但是因爲在這種情況下,csv首先被加載到Spark RDD中,我很確定csv模塊在這裏不適用。

謝謝。

回答

0

使用你的數據,這應該工作:

new_csv = [""] 
inside_quotes = False 
pos = 0 
for letter in csv: 
    if letter == ",": 
     if inside_quotes: 
      new_csv[pos] += letter 
     else: 
      new_csv.append("") 
      pos += 1 
    elif letter == '"': 
     inside_quotes = not inside_quotes # Switch inside_quotes to True if False or vice versa. 
    else: 
     new_csv[pos] += letter 

new_csv = [x for x in new_csv if x != ''] # Remove all '' 's. 
print(new_csv) 

輸出

['14K685', 'El Puente Academy for Peace and Justice', 'Brooklyn', 'K778', '718-387-1125', '718-387-4229', '9', '12', 'B24, B39, B44, B44-SBS, B46, B48, B57, B60, B62, Q54, Q59', 'G to Broadway ; J, M to Hewes St ; Z to Marcy Ave', '250 Hooper Street', 'Brooklyn', 'NY', '11211', 'www.elpuente.us', '225', 'N/A', 'Consortium School', 'We are a small, innovative learning community that promotes comprehensive academic excellence for all students while inspiring and nurturing leadership for peace and justice. Our state-of-the-art facility allows for a creative and intellectually challenging environment where every student thrives. Our project-based curriculum is designed to prepare students to be active citizens and independent thinkers who share a passion for transforming their communities and the world into a better place. Our trimester system allows students to complete most of their high school credits by the 11th grade, opening opportunities for exciting internships and college courses during the school day in their senior year.', "Accelerated credit accumulation (up to 18 credits per year), iLearn, iZone 360, Year-long SAT (Scholastic Aptitude Test) preparatory course, Individualized college counseling, Early College Awareness & Preparatory Program (ECAPP). Visits to college campuses in NYC, Visits to colleges outside NYC in partnership with the El Puente Leadership Center, Internships, Community-based Projects, Portfolio Assessment, Integrated-Arts Projects, Before- and After-school Tutoring; Elective courses include: Drama, Dance (Men's and Women's Groups), Debate Team partnership with Tufts University, Guitar, Filmmaking, Architecture, Glee", 'Spanish', 'AM and PM Academic Support, B-Boy/B-Girl, Chorus, College and Vocational Counseling and Placement, College Prep, Community Development Project, Computers, Dance Level 1 and 2, Individual Drama; Education for Public Inquiry and International Citizenship (EPIIC), El Puente Leadership Center, Film, Fine Arts, Liberation, Media, Men’s and Women’s Groups, Movement Theater Level 1, Movement Theater Level 2, Music, Music Production, Pre-professional training in Dance, PSAT/SAT Prep, Spoken Word, Student Council, Teatro El Puente, Visual Art', 'Boys & Girls Basketball, Baseball, Softball, Volleyball', 'El Puente Williamsburg Leadership Center; The El Puente Bushwick Center; Leadership Center at Taylor-Wythe Houses; Beacon Leadership Center at MS50.', 'Woodhull Medical Center, Governor Hospital', 'Hunter College (CUNY), Eugene Lang College The New School for Liberal Arts, Pratt College of Design, Tufts University, and Touro College.', 'El Puente Leadership Center, El Puente Bushwick Center, Beacon Leadership Center at MS50, Leadership Center at Taylor-Wythe Houses, Center for Puerto Rican Studies, Hip- Hop Theatre Festival, Urban Word, and Summer Search.', 'Our school requires assessment of an Academic Portfolio for graduation.', '9:00 AM', '3:30 PM', 'This school will provide students with disabilities the supports and services indicated on their IEPs.', 'ESL', 'Not Functionally Accessible', '1', 'Priority to Brooklyn students or residents', 'Then to New York City residents', '250 Hooper Street'] 

如何使用

  1. 初始化一個listnew_csv,包含一個空字符串元素。這將稍後存儲我們的最終輸出。

  2. 初始化一個bool,inside_quotes,這將告訴我們的程序何時在引號內或引號外的字母上進行解析。

  3. 初始化一個int,pos,這將告訴我們我們在new_csv列表中的位置。

  4. 迭代字符串中的每個字母。

  5. 檢查是否信是,

    • 檢查,看我們是否在解析引號內的字符串。

      • 如果是True,我們添加,new_csv內的字符串。

      • 如果這是False,我們不添加它,我們添加一個新的空白字符串,我們pos += 1

  6. 如果沒有,檢查是否該信是"

    • 如果True,我們切換boolinside_quotes爲true,如果使用手動not假或者假如果爲真關鍵詞。
  7. 如果是任何其他字符,我們只是將該字符添加到列表中的任何字符串。

  8. 做一些清理工作,並從列表中刪除所有空白字符串,''

  9. 打印它:)。

+0

感謝您的迴應,但這是特定於作爲RDD對象存在的列表,因爲這是不可迭代的,所以這不適用於我噸。這是一個非常奇怪的情況。 – dstar

+0

對不起,我不熟悉Spark。有沒有辦法讓它迭代?此外,我只是在這裏迭代一個'str'。 – Signal

0

這是閱讀表時非常普遍的問題。值得慶幸的是,Python有一個庫可以爲你做到這一點,所以你不必親自去做。你說csv模塊不工作,爲什麼?如果它不起作用,請嘗試下面的代碼並發表評論!

import csv 

# please note: KEEP YOUR FILE AS STRINGS when you read in your data. 
# Don't do anything to it to try to split it or something. 
my_rdd = sc.textFile("/your/file/location/*) 
split_with_quotes = my_rdd.map(lambda row: next(csv.reader(row.splitlines(), skipinitialspace=True)) 

你應該注意到,從CSV包CSV解析器的131,072個字符的字符串長度的限制,所以如果你有很長的字符串,你必須做一些更多的工作。

要檢查是否屬於這種情況,請運行以下命令:my_rdd.filter(lambda x: len(x) >= 131072).count()。如果count不是0,則表示字符串太長。

+0

謝謝@Katya ...我首先研究了Spark可能會阻止導入模塊之外的特定特性。當然你的解決方案確實可以在Spark外工作。 – dstar

+0

@dstar,我很困惑你想要什麼,但我會讓你知道這實際上是在Spark內工作的。我在Spark RDD上多次使用了這個精確的代碼。您是否擔心使用外部軟件包,因爲您無法訪問它們? –

2

您可以使用正則表達式。它的工作原理非常快,在PySpark:

import re 
rdd=sc.textFile("factbook.csv") 

# Get rid of those commas we do not need 
cleanedRdd=rdd.map(lambda x:re.match(r'(.*".*)(,)(.*".*)', x, re.M|re.I).group(1)+" "re.match(r'(.*".*)(,)(.*".*)', x, re.M|re.I).group(3) if re.match(r'(.*".*)(,)(.*".*)', x, re.M|re.I) !=None else x) 

因此,對於類似這樣的每一行:

col1,"col2,blabla",col3 

此代碼正則表達式模式匹配。如果找到模式,它創建3組:

  • 組1:COL1, 「COL2
  • 組2:,
  • 組3:布拉布拉」,COL3

最後我們串接第1組和第2組的輸出將是:

col1,"col2 blabla",col3