2017-10-19 267 views
0

首先,爲標題道歉,我不確定如何簡潔地描述這一點。Spark多個輸出路徑導致多個輸入讀取

我有一個Spark日誌解析到JSON,然後使用spark-sql將特定列轉換爲ORC並寫入各種路徑。例如:

val logs = sc.textFile("s3://raw/logs") 
val jsonRows = logs.mapPartitions(partition => { 
    partition.map(log => { 
    logToJson.parse(log) 
    } 
} 

jsonRows.foreach(r => { 
    val contentPath = "s3://content/events/" 
    val userPath = "s3://users/events/" 
    val contentDf = sqlSession.read.schema(contentSchema).json(r) 
    val userDf = sqlSession.read.schema(userSchema).json(r) 
    val userDfFiltered = userDf.select("*").where(userDf("type").isin("users") 
    // Save Data 
    val contentWriter = contentDf.write.mode("append").format("orc") 
    eventWriter.save(contentPath) 
    val userWriter = userDf.write.mode("append").format("orc") 
    userWriter.save(userPath) 

當我寫這篇文章時,我預計解析會發生一次,然後它會寫入相應的位置。但是,它似乎在執行文件中的所有代碼兩次 - 一次爲content,一次爲users。這是預期的嗎?我寧願我最終不會從S3傳輸數據和解析兩次,因爲這是最大的瓶頸。我從Spark UI中附加了一個圖像,以顯示單個「流」窗口的任務重複。感謝您的任何幫助,您可以提供! Spark Application UI

回答

0

好的,這種嵌套的DFs是不行的。 DataFrames的意思是數據結構爲數據集將不適合正常的數據結構(如SeqList),並且需要以分佈式方式處理。它是而不是只是另一種陣列。你在這裏試圖做的是每個日誌行創建一個DataFrame,這沒什麼意義。

據我可以告訴你在這裏發佈的(不完整)代碼,你想創建兩個新的DataFrames從你原來的輸入(日誌),然後你想存儲在兩個不同的位置。這樣的事情:

val logs = sc.textFile("s3://raw/logs") 
val contentPath = "s3://content/events/" 
val userPath = "s3://users/events/" 

val jsonRows = logs 
    .mapPartitions(partition => { 
    partition.map(log => logToJson.parse(log)) 
    } 
    .toDF() 
    .cache() // Or use persist() if dataset is larger than will fit in memory 

jsonRows 
    .write 
    .format("orc") 
    .save(contentPath) 

jsonRows 
    .filter(col("type").isin("users")) 
    .write 
    .format("orc") 
    .save(userPath) 

希望這會有所幫助。