2015-10-27 27 views
17

我有,我相信,火花流一個比較常見的用例:如何更新火花流中的廣播變量?

我有,我想基於一些參考數據

最初篩選對象的流,我認爲這將是一個非常簡單的事情,實現使用廣播可變

public void startSparkEngine { 
    Broadcast<ReferenceData> refdataBroadcast 
     = sparkContext.broadcast(getRefData()); 

    final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> { 
     final ReferenceData refData = refdataBroadcast.getValue(); 
     return obj.getField().equals(refData.getField()); 
    } 

    filteredStream.foreachRDD(rdd -> { 
     rdd.foreach(obj -> { 
      // Final processing of filtered objects 
     }); 
     return null; 
    }); 
} 

然而,儘管不經常發生,我引用的數據將定期更改

我的印象是,我可以修改和再廣播我對司機變量,它會傳播到每個職工,但是Broadcast對象不是Serializable並需要final下。

我還有什麼替代方案?這三個解決方案,我能想到的是:

  1. 移動參考數據查找到forEachPartitionforEachRdd,使其完全駐留在工人。然而,參考數據生活在一個REST API中,所以我還需要以某種方式存儲一個定時器/計數器,以阻止流中每個元素都訪問遠程計算機。

  2. 每當refdata更改時都重新啓動Spark Context,並使用新的廣播變量。

  3. 轉換參考數據的RDD,然後join以這樣的方式,我現在流Pair<MyObject, RefData>,雖然這會船參考數據與每個對象的流。

回答

3

不知道,如果你已經試過這已經但是我想在不關閉SparkContext可以實現更新,以廣播的變量。通過使用unpersist()方法,廣播變量的拷貝在每個執行器上被刪除,並且需要變量需要被重播以便被再次訪問。爲了您的使用情況下,如果要更新您的播放,您可以:

  1. 等待您的執行人,完成對當前一系列數據

  2. Unpersist廣播可變

  3. 更新廣播可變

  4. 轉播到新的參考數據發送到執行器

我從this post得到相當大的影響,但是發出最後一個答覆的人聲稱已經在本地工作。重要的是要注意,您可能想要在非持久存儲上將阻塞設置爲true,這樣您就可以確信執行者已經清除了舊數據(因此在下一次迭代時將不會再讀取陳舊值)。

1

幾乎每個處理流應用程序的人都需要一種將參考數據(從數據庫,文件等)中編織(過濾,查找等)到流數據中的方法。我們有整個兩個部分的一個部分解決方案

  1. 在流操作

    使用

    查找參考數據

    • 創建具有期望的高速緩存TTL
    • 渦卷,在廣播
    • 使用CacheLookup對象作爲流邏輯的一部分的CacheLookup

對於大多數部分這工作正常,但有如下

  • 更新基準數據

    有,儘管在這些線程,即,建議沒有明確的方式實現這一點:殺死以前的廣播變量並創建新的廣播變量。多個未知數,比如這些操作之間的預期值。

  • 這是一個常見的需求,它會幫助如果有一種方法發送信息廣播變量通知更新。由此,可以使「CacheLookup」中的本地緩存無效

    問題的第二部分仍未解決。如果有任何可行的方法,我會感興趣

    16

    擴展答案由@Rohan Aletty。下面是刷新基於一些TTL

    public class BroadcastWrapper { 
    
        private Broadcast<ReferenceData> broadcastVar; 
        private Date lastUpdatedAt = Calendar.getInstance().getTime(); 
    
        private static BroadcastWrapper obj = new BroadcastWrapper(); 
    
        private BroadcastWrapper(){} 
    
        public static BroadcastWrapper getInstance() { 
         return obj; 
        } 
    
        public JavaSparkContext getSparkContext(SparkContext sc) { 
         JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc); 
         return jsc; 
        } 
    
        public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){ 
         Date currentDate = Calendar.getInstance().getTime(); 
         long diff = currentDate.getTime()-lastUpdatedAt.getTime(); 
         if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms 
          if (var != null) 
           var.unpersist(); 
          lastUpdatedAt = new Date(System.currentTimeMillis()); 
    
          //Your logic to refresh 
          ReferenceData data = getRefData(); 
    
          var = getSparkContext(sparkContext).broadcast(data); 
         } 
         return var; 
        } 
    } 
    

    您的代碼看起來像廣播變量BroadcastWrapper的示例代碼:

    public void startSparkEngine() { 
    
        final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> { 
         Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context()); 
    
         stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField())); 
        }); 
    
        filteredStream.foreachRDD(rdd -> { 
         rdd.foreach(obj -> { 
         // Final processing of filtered objects 
         }); 
         return null; 
        }); 
    } 
    

    這爲我工作多集羣上爲好。 希望這會有所幫助

    +0

    感謝您的解決方案。你知道updateAndGet是否會在Driver節點或Worker節點上執行嗎?包裝本身似乎沒有廣播,所以我認爲它不適用於每個工人節點。如果它在Driver節點上執行,這是否意味着每次Worker都必須在每次嘗試訪問該值時詢問Driver? (這與第一次使用廣播變量的想法相矛盾) – johannesv

    +2

    此函數返回廣播類型對象的引用。廣播類型的對象將具有廣播變量的標識符和塊的數量。當調用refdataBroadcast.getValue()時,如果廣播標識符出現在執行程序內存中,則不會重新計算它。所有這些都發生在執行器上,但是當sparkContext.broadcast被調用時,驅動程序就會出現。所以updateAndGet只有​​在變量刷新並重新播放(只有驅動程序可以照顧)時纔會在驅動程序節點上執行。 – Aastha

    +0

    那是有道理的,謝謝你的解釋! – johannesv