我有,我相信,火花流一個比較常見的用例:如何更新火花流中的廣播變量?
我有,我想基於一些參考數據
最初篩選對象的流,我認爲這將是一個非常簡單的事情,實現使用廣播可變:
public void startSparkEngine {
Broadcast<ReferenceData> refdataBroadcast
= sparkContext.broadcast(getRefData());
final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
final ReferenceData refData = refdataBroadcast.getValue();
return obj.getField().equals(refData.getField());
}
filteredStream.foreachRDD(rdd -> {
rdd.foreach(obj -> {
// Final processing of filtered objects
});
return null;
});
}
然而,儘管不經常發生,我引用的數據將定期更改
我的印象是,我可以修改和再廣播我對司機變量,它會傳播到每個職工,但是Broadcast
對象不是Serializable
並需要final
下。
我還有什麼替代方案?這三個解決方案,我能想到的是:
移動參考數據查找到
forEachPartition
或forEachRdd
,使其完全駐留在工人。然而,參考數據生活在一個REST API中,所以我還需要以某種方式存儲一個定時器/計數器,以阻止流中每個元素都訪問遠程計算機。每當refdata更改時都重新啓動Spark Context,並使用新的廣播變量。
轉換參考數據的RDD,然後
join
以這樣的方式,我現在流Pair<MyObject, RefData>
,雖然這會船參考數據與每個對象的流。
感謝您的解決方案。你知道updateAndGet是否會在Driver節點或Worker節點上執行嗎?包裝本身似乎沒有廣播,所以我認爲它不適用於每個工人節點。如果它在Driver節點上執行,這是否意味着每次Worker都必須在每次嘗試訪問該值時詢問Driver? (這與第一次使用廣播變量的想法相矛盾) – johannesv
此函數返回廣播類型對象的引用。廣播類型的對象將具有廣播變量的標識符和塊的數量。當調用refdataBroadcast.getValue()時,如果廣播標識符出現在執行程序內存中,則不會重新計算它。所有這些都發生在執行器上,但是當sparkContext.broadcast被調用時,驅動程序就會出現。所以updateAndGet只有在變量刷新並重新播放(只有驅動程序可以照顧)時纔會在驅動程序節點上執行。 – Aastha
那是有道理的,謝謝你的解釋! – johannesv