我目前正致力於將主要在SQL存儲過程中編寫的古老系統轉移到Scala以在Spark上運行。存儲過程是批量作業,每天/每週/每年一次,在「請求」對象上運行,可能需要幾小時才能運行。SQL存儲過程到Scala/Spark流
由於幾個原因,我們正在將系統更改爲流模型(Spark Streaming)。
在舊的系統中,很多邏輯是用連接語句執行的,其中大量的請求連接了許多表。
一種解決方案是基本上採用相同的SQl代碼並將其移植到Spark SQL語句中,然後該語句將在請求的「微批」上運行。但是,這意味着我們仍然在執行大量的聯接語句,我聽說在Spark SQL中效率低下。
我有第二個想法是把業務邏輯,並編寫代碼,如果我們只需要過程單一請求(也就是說,如果你有10個應用程序,而不是處理所有這些帶有連接,你會編程,就好像您在處理單個請求一樣)。然後,我將採取微批量的請求並通過邏輯處理(即Requests.map(r => RequestLogic.execute(r)))映射它們。
類似下面的示例代碼:
case class Request(id: Int, typeId: Int, value: Long)
def CreateStreamingContext(sparkConf: SparkConf, streamDuration: Duration,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): StreamingContext = {
sparkConf.set(SparkArgumentKeys.MaxCores, (partitionCount * 2).toString)
val ssc = new StreamingContext(sparkConf, streamDuration)
ssc.checkpoint(checkpointDir)
val stream = EventHubsUtils.createUnionStream(ssc, hubParams, storageLevel)
stream.checkpoint(streamDuration)
stream.map(x => Request(x(1), x(2), x(3)))
.map(r => RequestLogic.execute(r))
ssc
}
我想弄清楚:
1)哪一個會變得更好。
2)各有什麼優點/缺點。
我是新來斯卡拉/ Spark和試圖找出最好的方式。我不確定這是否足夠的信息,如果需要,我會嘗試並提供更多細節。