2016-08-30 71 views
1

我目前正致力於將主要在SQL存儲過程中編寫的古老系統轉移到Scala以在Spark上運行。存儲過程是批量作業,每天/每週/每年一次,在「請求」對象上運行,可能需要幾小時才能運行。SQL存儲過程到Scala/Spark流

由於幾個原因,我們正在將系統更改爲流模型(Spark Streaming)。

在舊的系統中,很多邏輯是用連接語句執行的,其中大量的請求連接了許多表。

一種解決方案是基本上採用相同的SQl代碼並將其移植到Spark SQL語句中,然後該語句將在請求的「微批」上運行。但是,這意味着我們仍然在執行大量的聯接語句,我聽說在Spark SQL中效率低下。

我有第二個想法是把業務邏輯,並編寫代碼,如果我們只需要過程單一請求(也就是說,如果你有10個應用程序,而不是處理所有這些帶有連接,你會編程,就好像您在處理單個請求一樣)。然後,我將採取微批量的請求並通過邏輯處理(即Requests.map(r => RequestLogic.execute(r)))映射它們。

類似下面的示例代碼:

case class Request(id: Int, typeId: Int, value: Long) 

def CreateStreamingContext(sparkConf: SparkConf, streamDuration: Duration, 
          storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): StreamingContext = { 

    sparkConf.set(SparkArgumentKeys.MaxCores, (partitionCount * 2).toString) 
    val ssc = new StreamingContext(sparkConf, streamDuration) 
    ssc.checkpoint(checkpointDir) 

    val stream = EventHubsUtils.createUnionStream(ssc, hubParams, storageLevel) 
    stream.checkpoint(streamDuration) 

    stream.map(x => Request(x(1), x(2), x(3))) 
     .map(r => RequestLogic.execute(r)) 

    ssc 
} 

我想弄清楚:

1)哪一個會變得更好。
2)各有什麼優點/缺點。

我是新來斯卡拉/ Spark和試圖找出最好的方式。我不確定這是否足夠的信息,如果需要,我會嘗試並提供更多細節。

回答

0

有趣的問題,答案取決於你的數據的形狀。我想假設兩種情況:

  • 首先,你有很多Request的數據,你想加入他們的行列,以相對小的主數據量。

  • 二,Request數據和數據加入的數據量同樣大,超過您的羣集的RAM。

在第一種情況下,你可以指點火花(原則上也應該能夠自動判斷)使用一種叫做BroadcastHashJoin。策略是將小表廣播到每個Spark工作人員,並將其與更大的RDD中的每個元素進行連接。將會有兩個以上的非空分區,所以Spark可以在兩個以上的工作節點上運行得更快。在合同ShuffleHashJoin將採取所有行,並隨密鑰洗牌。整個表只有2個非空分區,並且向作業中添加更多工作節點將無濟於事。所以能夠做到BroadcastHashJoin以最少的努力確保可擴展性。有關DataBricks鏈接的筆記本的更多詳細信息。

在第二種情況下,您的策略並不是一個壞主意。我們有很好的預處理數據的經驗,可以加入像RocksDBAWS DynamoDB這樣的外部KV-Store,然後進行查找並以流媒體的方式加入。但是,即使在一個小羣集上也可以將這個過程擴展到真正的海量數據集,但性能和工作量要比純粹的Spark內存方法高得多。