我有一組使用嵌套鍵值對的大型壓縮json文件。 json對象中有大約70-80個鍵(和子鍵),但是,我只對幾個鍵感興趣。我想用Spark SQL查詢json文件,只挑出我感興趣的鍵值對,並將它們輸出到一組csv文件。處理一個大小爲170MB的壓縮json文件需要大約5分鐘的時間。我只是想知道是否有任何方法來優化這個過程。或者除了Spark這樣的工作,還有其他更好的工具嗎?謝謝!快速處理Spark中的json文件的方法
下面是我用的是Scala代碼的快照:
val data = sc.textFile("abcdefg.txt.gz")
// repartition the data
val distdata = data.repartition(10)
val dataDF = sqlContext.read.json(distdata)
// register a temp table
dataDF.registerTempTable("pixels")
// query the json file, grab columns of interest
val query =
"""
|SELECT col1, col2, col3, col4, col5
|FROM pixels
|WHERE col1 IN (col1_v1, col1_v2, ...)
""".stripMargin
val result = sqlContext.sql(query)
// reformat the timestamps
val result2 = result.map(
row => {
val timestamp = row.getAs[String](0).stripSuffix("Z").replace("T"," ")
Row(timestamp, row(1), row(2), row(3), row(4), row(5), row(6), row(7),
row(8), row(9), row(10), row(11))
}
)
// output the result to a csv and remove the square bracket in each row
val output_file = "/root/target"
result2.map(row => row.mkString(",")).saveAsTextFile(output_file)
我;猜大部分時間的推移讀/解壓縮和寫作,這不能並行化。添加分配作業和收集結果的開銷,我的猜測是使用Spark會讓你放慢速度。爲什麼未分析的行的「重新分配」? –
如果你只是想改變你的數據。你不需要所有的SparkSQL功能。只要堅持RDD的。使用像PlayJson這樣的快速json庫來解析json。修改並轉儲它。 –
除非明確要求,否則請勿對RDD進行重新分區。 –