0
我試圖將一組像下面這樣的S3文件根據列拆分爲各個基於列的文件夾。我不確定下面的代碼存在問題。Spark根據字段將文件拆分爲多個文件夾
column 1, column 2
20130401, value1
20130402, value2
20130403, value3
val newDataDF = sqlContext.read.parquet("s3://xxxxxxx-bucket/basefolder/")
newDataDF.cache()
val uniq_days = newDataDF.select(newDataDF("column1")).distinct.show()
uniq_days.cache()
uniq_days.foreach(x => {newDataDF.filter(newDataDF("column1") === x).write.save(s"s3://xxxxxx-bucket/partitionedfolder/$x/")})
能否請你幫忙嗎?即使是一個pyspark版本也沒問題。 我正在尋找以下結構。
s3://xxxxxx-bucket/partitionedfolder/20130401/part-***
column 1, column 2
20130401, value 1
s3://xxxxxx-bucket/partitionedfolder/20130402/part-***
column 1, column 2
20130402, value 1
s3://xxxxxx-bucket/partitionedfolder/20130403/part-***
column 1, column 2
20130403, value 1
以下是錯誤
org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 82.0 failed 4 times, most recent failure: Lost task 22.3 in stage 82.0 (TID 2753
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
Caused by: java.lang.NullPointerException
更新與當前的解決方案:
val newDataDF = sqlContext.read.parquet("s3://xxxxxx-bucket/basefolder/")
newDataDF.cache()
val uniq_days = newDataDF.select(newDataDF("column1")).distinct.rdd.map(_.getString(0)).collect().toList
uniq_days.foreach(x => {newDataDF.filter(newDataDF("column1") === x).write.save(s"s3://xxxxxx-bucket/partitionedfolder/$x/")})
我不這麼認爲。我現在又增加了,還是同樣的錯誤。 – androboy
@androboy:根據您的修改,我添加了更多修補程序。 – RBanerjee
謝謝。我做了它val uniq_days = newDataDF.select(newDataDF(「mevent_day」))。distinct.collect()。toList uniq_days.foreach(x => {newDataDF.filter(newDataDF(「mevent_day」)=== x)。 write.save(s「s3:// xxxxxxxx-bucket/partitionedfolder/$ x /」)})錯誤是java.lang.RuntimeException:不支持的文字類型類org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [2013 -04-02] – androboy