2017-04-03 107 views
0

我試圖將一組像下面這樣的S3文件根據列拆分爲各個基於列的文件夾。我不確定下面的代碼存在問題。Spark根據字段將文件拆分爲多個文件夾

column 1, column 2 
20130401, value1 
20130402, value2 
20130403, value3 

val newDataDF = sqlContext.read.parquet("s3://xxxxxxx-bucket/basefolder/") 
    newDataDF.cache() 
    val uniq_days = newDataDF.select(newDataDF("column1")).distinct.show() 
    uniq_days.cache() 
    uniq_days.foreach(x => {newDataDF.filter(newDataDF("column1") === x).write.save(s"s3://xxxxxx-bucket/partitionedfolder/$x/")}) 

能否請你幫忙嗎?即使是一個pyspark版本也沒問題。 我正在尋找以下結構。

s3://xxxxxx-bucket/partitionedfolder/20130401/part-*** 

    column 1, column 2 
    20130401, value 1 
s3://xxxxxx-bucket/partitionedfolder/20130402/part-*** 

    column 1, column 2 
    20130402, value 1 
s3://xxxxxx-bucket/partitionedfolder/20130403/part-*** 

    column 1, column 2 
    20130403, value 1 

以下是錯誤

org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 82.0 failed 4 times, most recent failure: Lost task 22.3 in stage 82.0 (TID 2753 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) 
Caused by: java.lang.NullPointerException 

更新與當前的解決方案:

val newDataDF = sqlContext.read.parquet("s3://xxxxxx-bucket/basefolder/") 
newDataDF.cache() 
val uniq_days = newDataDF.select(newDataDF("column1")).distinct.rdd.map(_.getString(0)).collect().toList 
uniq_days.foreach(x => {newDataDF.filter(newDataDF("column1") === x).write.save(s"s3://xxxxxx-bucket/partitionedfolder/$x/")}) 

回答

2

我想你在保存錯過的 「s」。 :)

http://docs.scala-lang.org/overviews/core/string-interpolation.html#the-s-string-interpolator

變化:

write.save("s3://xxxxxx-bucket/partitionedfolder/$x/")}) 

要:

write.save(s"s3://xxxxxx-bucket/partitionedfolder/$x/")}) 

有更多的問題,表演永遠不會返回任何值。

變化:

val uniq_days = newDataDF.select(newDataDF("mevent_day")).distinct.show() 
uniq_days.cache() 

要:

val uniq_days = newDataDF.select(newDataDF("mevent_day")).distinct.rdd.map(_.getString(0)).collect().toList 
+0

我不這麼認爲。我現在又增加了,還是同樣的錯誤。 – androboy

+0

@androboy:根據您的修改,我添加了更多修補程序。 – RBanerjee

+0

謝謝。我做了它val uniq_days = newDataDF.select(newDataDF(「mevent_day」))。distinct.collect()。toList uniq_days.foreach(x => {newDataDF.filter(newDataDF(「mevent_day」)=== x)。 write.save(s「s3:// xxxxxxxx-bucket/partitionedfolder/$ x /」)})錯誤是java.lang.RuntimeException:不支持的文字類型類org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [2013 -04-02] – androboy

相關問題