我實際上用盡了選項。 在我的火花流應用程序。我想保留一些關鍵的狀態。我從卡夫卡收到活動。然後我從事件中提取密鑰,比如userID。當沒有來自Kafka的事件時,我想每3秒鐘更新一次計數器,以保證每個用戶ID相對於每個用戶ID,因爲我用3秒配置了StreamingContext的batchduration。在火花流中重播RDD以更新累加器
我現在做可能非常醜陋的方式,但至少它的工作原理:我有一個這樣的accumulableCollection:
val userID = ssc.sparkContext.accumulableCollection(new mutable.HashMap[String,Long]())
然後我創建了一個「假」事件,並把它推到我的火花流媒體上下文如下:
val rddQueue = new mutable.SynchronizedQueue[RDD[String]]()
for (i <- 1 to 100) {
rddQueue += ssc.sparkContext.makeRDD(Seq("FAKE_MESSAGE"))
Thread.sleep(3000)
}
val inputStream = ssc.queueStream(rddQueue)
inputStream.foreachRDD(UPDATE_MY_ACCUMULATOR)
這將讓我訪問我的accumulatorCollection並更新所有用戶ID的所有計數器。截至目前一切正常,但是當我改變我的循環來源:
for (i <- 1 to 100) {} #This is for test
要:
while (true) {} #This is to let me access and update my accumulator through the whole application life cycle
後來,當我跑我的./spark-submit,我的應用程序卡住在這個舞臺上:
15/12/10 18:09:00 INFO BlockManagerMasterActor: Registering block manager slave1.cluster.example:38959 with 1060.3 MB RAM, BlockManagerId(1, slave1.cluster.example, 38959)
任何有關如何解決這個問題的線索?有沒有一種非常簡單的方法可以讓我更新我的用戶標識的值(而不是創建一個無用的RDD並定期將它推送到queuestream)?