0
首先,爲標題道歉,我不確定如何簡潔地描述這一點。Spark多個輸出路徑導致多個輸入讀取
我有一個Spark日誌解析到JSON,然後使用spark-sql將特定列轉換爲ORC並寫入各種路徑。例如:
val logs = sc.textFile("s3://raw/logs")
val jsonRows = logs.mapPartitions(partition => {
partition.map(log => {
logToJson.parse(log)
}
}
jsonRows.foreach(r => {
val contentPath = "s3://content/events/"
val userPath = "s3://users/events/"
val contentDf = sqlSession.read.schema(contentSchema).json(r)
val userDf = sqlSession.read.schema(userSchema).json(r)
val userDfFiltered = userDf.select("*").where(userDf("type").isin("users")
// Save Data
val contentWriter = contentDf.write.mode("append").format("orc")
eventWriter.save(contentPath)
val userWriter = userDf.write.mode("append").format("orc")
userWriter.save(userPath)
當我寫這篇文章時,我預計解析會發生一次,然後它會寫入相應的位置。但是,它似乎在執行文件中的所有代碼兩次 - 一次爲content
,一次爲users
。這是預期的嗎?我寧願我最終不會從S3傳輸數據和解析兩次,因爲這是最大的瓶頸。我從Spark UI中附加了一個圖像,以顯示單個「流」窗口的任務重複。感謝您的任何幫助,您可以提供!