我正在使用Spark Streaming創建系統以豐富雲數據庫中的傳入數據。示例 -在PySpark環境中創建緩存的最佳方式
Incoming Message: {"id" : 123}
Outgoing Message: {"id" : 123, "data": "xxxxxxxxxxxxxxxxxxx"}
我對驅動程序類代碼如下:
from Sample.Job import EnrichmentJob
from Sample.Job import FunctionJob
import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from kafka import KafkaConsumer, KafkaProducer
import json
class SampleFramework():
def __init__(self):
pass
@staticmethod
def messageHandler(m):
return json.loads(m.message)
@staticmethod
def processData(rdd):
if (rdd.isEmpty()):
print("RDD is Empty")
return
# Expand
expanded_rdd = rdd.mapPartitions(EnrichmentJob.enrich)
# Score
scored_rdd = expanded_rdd.map(FunctionJob.function)
# Publish RDD
def run(self, ssc):
self.ssc = ssc
directKafkaStream = KafkaUtils.createDirectStream(self.ssc, QUEUENAME, \
{"metadata.broker.list": META,
"bootstrap.servers": SERVER}, \
messageHandler= SampleFramework.messageHandler)
directKafkaStream.foreachRDD(SampleFramework.processData)
ssc.start()
ssc.awaitTermination()
代碼的富集工作如下: 類EnrichmentJob:
cache = {}
@staticmethod
def enrich(data):
# Assume that Cloudant Connector using the available config
cloudantConnector = CloudantConnector(config, config["cloudant"]["host"]["req_db_name"])
final_data = []
for row in data:
id = row["id"]
if(id not in EnrichmentJob.cache.keys()):
data = cloudantConnector.getOne({"id": id})
row["data"] = data
EnrichmentJob.cache[id]=data
else:
data = EnrichmentJob.cache[id]
row["data"] = data
final_data.append(row)
cloudantConnector.close()
return final_data
我的問題是 - 有沒有辦法保持[1]「主存儲器上的全局緩存,所有工作人員都可以訪問」或者[2]「每個工作人員的本地緩存,這樣他們仍然保持在前臺achRDD設置「?
我已經探討了以下 -
廣播變量 - 在這裏,我們去了[1]的方式。據我瞭解,它們的意圖是隻讀和不可變的。我已經檢查了這個reference,但它引用了一個不執行/保持廣播變量的例子。這是一個很好的做法嗎?
靜態變量 - 這裏我們走[2]的方法。被引用的類(在本例中爲「Enricher」)以靜態變量字典的形式維護緩存。但事實證明,ForEachRDD函數爲每個傳入的RDD生成一個全新的進程,並刪除了以前啓動的靜態變量。這是上面編碼的那個。
我現在所擁有的兩種可能的解決方案 -
- 維護文件系統上的脫機緩存。
- 在我的驅動程序節點上執行此豐富任務的整個計算。這會導致整個數據以驅動程序結束並在那裏維護。緩存對象將作爲映射函數的參數發送到濃縮作業。
這裏顯然第一個看起來比第二個好,但我希望得出結論,這兩個是唯一的方法,在承諾之前。任何指針將不勝感激!