2016-12-30 62 views
1

我正在使用Spark Streaming創建系統以豐富雲數據庫中的傳入數據。示例 -在PySpark環境中創建緩存的最佳方式

Incoming Message: {"id" : 123} 
Outgoing Message: {"id" : 123, "data": "xxxxxxxxxxxxxxxxxxx"} 

我對驅動程序類代碼如下:

from Sample.Job import EnrichmentJob 
from Sample.Job import FunctionJob 
import pyspark 
from pyspark.streaming.kafka import KafkaUtils 
from pyspark import SparkContext, SparkConf, SQLContext 
from pyspark.streaming import StreamingContext 
from pyspark.sql import SparkSession 

from kafka import KafkaConsumer, KafkaProducer 
import json 

class SampleFramework(): 

    def __init__(self): 
     pass 

    @staticmethod 
    def messageHandler(m): 
     return json.loads(m.message) 

    @staticmethod 
    def processData(rdd): 

     if (rdd.isEmpty()): 
      print("RDD is Empty") 
      return 

     # Expand 
     expanded_rdd = rdd.mapPartitions(EnrichmentJob.enrich) 

     # Score 
     scored_rdd = expanded_rdd.map(FunctionJob.function) 

     # Publish RDD 


    def run(self, ssc): 

     self.ssc = ssc 

     directKafkaStream = KafkaUtils.createDirectStream(self.ssc, QUEUENAME, \ 
                  {"metadata.broker.list": META, 
                  "bootstrap.servers": SERVER}, \ 
                  messageHandler= SampleFramework.messageHandler) 

     directKafkaStream.foreachRDD(SampleFramework.processData) 

     ssc.start() 
     ssc.awaitTermination() 

代碼的富集工作如下: 類EnrichmentJob:

cache = {} 

@staticmethod 
def enrich(data): 

    # Assume that Cloudant Connector using the available config 
    cloudantConnector = CloudantConnector(config, config["cloudant"]["host"]["req_db_name"]) 
    final_data = [] 
    for row in data: 
     id = row["id"] 
     if(id not in EnrichmentJob.cache.keys()): 
      data = cloudantConnector.getOne({"id": id}) 
      row["data"] = data 
      EnrichmentJob.cache[id]=data 
     else: 
      data = EnrichmentJob.cache[id] 
      row["data"] = data 
     final_data.append(row) 

    cloudantConnector.close() 

    return final_data 

我的問題是 - 有沒有辦法保持[1]「主存儲器上的全局緩存,所有工作人員都可以訪問」或者[2]「每個工作人員的本地緩存,這樣他們仍然保持在前臺achRDD設置「?

我已經探討了以下 -

  1. 廣播變量 - 在這裏,我們去了[1]的方式。據我瞭解,它們的意圖是隻讀和不可變的。我已經檢查了這個reference,但它引用了一個不執行/保持廣播變量的例子。這是一個很好的做法嗎?

  2. 靜態變量 - 這裏我們走[2]的方法。被引用的類(在本例中爲「Enricher」)以靜態變量字典的形式維護緩存。但事實證明,ForEachRDD函數爲每個傳入的RDD生成一個全新的進程,並刪除了以前啓動的靜態變量。這是上面編碼的那個。

我現在所擁有的兩種可能的解決方案 -

  1. 維護文件系統上的脫機緩存。
  2. 在我的驅動程序節點上執行此豐富任務的整個計算。這會導致整個數據以驅動程序結束並在那裏維護。緩存對象將作爲映射函數的參數發送到濃縮作業。

這裏顯然第一個看起來比第二個好,但我希望得出結論,這兩個是唯一的方法,在承諾之前。任何指針將不勝感激!

回答

1

有什麼方法來維持[1]「主內存,所有員工都可以訪問的全局高速緩存」

號沒有「主存」,它可以被所有訪問工人。每個工作人員在一個單獨的進程中運行,並使用套接字與外部世界通信。更不用說在非本地模式下分離不同物理節點。

有一些技術可以應用於實現工作區域緩存內存映射數據(使用SQLite是最簡單的),但它需要一些額外的努力來實現正確的方式(避免衝突等)。

或[2]「每個工人的本地緩存,以便它們保持在foreachRDD設置」?

您可以使用標準緩存技術,其範圍僅限於單個工作進程。根據配置(靜態與dynamic resource allocationspark.python.worker.reuse),可能會或可能不會保留多個任務和批次之間的配置。

考慮以下,簡化,例如:

  • map_param.py

    from pyspark import AccumulatorParam 
    from collections import Counter 
    
    class CounterParam(AccumulatorParam): 
        def zero(self, v: Counter) -> Counter: 
         return Counter() 
    
        def addInPlace(self, acc1: Counter, acc2: Counter) -> Counter: 
         acc1.update(acc2) 
         return acc1 
    
  • my_utils.py

    from pyspark import Accumulator 
    from typing import Hashable 
    from collections import Counter 
    
    # Dummy cache. In production I would use functools.lru_cache 
    # but it is a bit more painful to show with accumulator 
    cached = {} 
    
    def f_cached(x: Hashable, counter: Accumulator) -> Hashable: 
        if cached.get(x) is None: 
         cached[x] = True 
         counter.add(Counter([x])) 
        return x 
    
    
    def f_uncached(x: Hashable, counter: Accumulator) -> Hashable: 
        counter.add(Counter([x])) 
        return x 
    
  • main.py

    from pyspark.streaming import StreamingContext 
    from pyspark import SparkContext 
    
    from counter_param import CounterParam 
    import my_utils 
    
    from collections import Counter 
    
    def main(): 
        sc = SparkContext("local[1]") 
        ssc = StreamingContext(sc, 5) 
    
        cnt_cached = sc.accumulator(Counter(), CounterParam()) 
        cnt_uncached = sc.accumulator(Counter(), CounterParam()) 
    
        stream = ssc.queueStream([ 
         # Use single partition to show cache in work 
         sc.parallelize(data, 1) for data in 
         [[1, 2, 3], [1, 2, 5], [1, 3, 5]] 
        ]) 
    
        stream.foreachRDD(lambda rdd: rdd.foreach(
         lambda x: my_utils.f_cached(x, cnt_cached))) 
        stream.foreachRDD(lambda rdd: rdd.foreach(
         lambda x: my_utils.f_uncached(x, cnt_uncached))) 
    
        ssc.start() 
        ssc.awaitTerminationOrTimeout(15) 
        ssc.stop(stopGraceFully=True) 
    
        print("Counter cached {0}".format(cnt_cached.value)) 
        print("Counter uncached {0}".format(cnt_uncached.value)) 
    
    if __name__ == "__main__": 
        main() 
    

實例運行:

bin/spark-submit main.py 
Counter cached Counter({1: 1, 2: 1, 3: 1, 5: 1}) 
Counter uncached Counter({1: 3, 2: 2, 3: 2, 5: 2}) 

正如你可以看到,我們得到預期的結果:

  • 對於 「緩存」 的對象累加器每唯一的密鑰更新一次每個工作者進程(分區)。
  • 對於未緩存的對象,累加器在每次發生密鑰時都會更新。