2016-04-24 31 views
1

我是相對較新的火花,目前我正在包裝我的頭,關於如何(重新)通過將數據從S3導入到日誌文件(地板文件)進行分區。在導入/分離上分區

我在S3中有一堆GZip日誌文件,格式爲{bucket}/{YYYY-MM-DD}/{CustomerId}.log.gz。日誌文件的大小從< 1MB到500MB。

導入時我正在一個pyspark腳本執行以下操作:

# load, unpack and parse file from S3 into an rdd of Rows 
# using just python modules: boto, cStringIO and gzip 

# then: 
rdd = rdd.distinct() 
df = sqlContext.createDataFrame(rdd, schema) 
df = df.withColumn("timeDay", F.date_format(df.time, "yyyy-MM-dd")) 
df.write.parquet("PATH/", mode="append", partitionBy=["timeDay"]) 

的問題,我是(我認爲):

  • distinct將比較的一切一切。 這是合乎邏輯的問題,但它產生了很多洗牌。如何刪除重複行WITHIN一位客戶和一天?
  • 也在distinct它創建(對我來說)正好200個分區,每個客戶和每天的混合數據。所以,如果我輸入有一天,我得到200個分區非常小的文件,但如果我嘗試一個月導入我還可以得到200個分區碰上例外Missing an output location for shuffle 0
    • 我可以定義在distinct分區的數量,但是怎麼會變成這樣解決我的問題?
  • 沒有distinct我得到每輸入文件,這意味着我有一些非常大的分區和一些非常小的這已經是壞的並行我的任務

這將是真的,如果有人有幫助的一個分區的人可以幫助我如何在導入時分割/合併我的文件以獲得更好的並行化,以及如何解決每個客戶和每天需要唯一行的問題。

PS:我運行火花1.5.2與Python 2

,我真的很喜歡這個partitionBy=["timeDay"]作爲第一個分區,因爲有時候我需要重新導入(覆蓋)僅數天之後。

在此先感謝!

回答

0

您可以在DataFrame上使用repartition按列進行分區或重新分區到指定數量的分區。我不完全確定你在問什麼:distinct。如果不將每一行與每一行進行比較,都無法刪除所有重複的行。

+0

感謝您的回答!我注意到由列分區被添加到火花1.6:( –

+0

與''distinct''我的意思是它可能運行它只是一個輸入文件,而不是比較所有輸入文件,因爲不同的文件行不會重複?如果我使用1.6運行不同的+重新分區,它會將大量數據兩次洗牌 –

+0

如果您只想在一個輸入文件中調用不同的分隔符,請將其加載到單獨的RDD中,調用不同的分隔符,然後將它與另一個RDD如果你想刪除第一個文件中出現的其他文件中的所有行,你可能會做一些reduceByKey。至於洗牌,是否有一個你想重新分區的理由呢?最好嘗試運行你的代碼,看它是否運行得足夠快,不需要優化。 –