2015-12-10 79 views
2

我實際上用盡了選項。 在我的火花流應用程序。我想保留一些關鍵的狀態。我從卡夫卡收到活動。然後我從事件中提取密鑰,比如userID。當沒有來自Kafka的事件時,我想每3秒鐘更新一次計數器,以保證每個用戶ID相對於每個用戶ID,因爲我用3秒配置了StreamingContext的batchduration。在火花流中重播RDD以更新累加器

我現在做可能非常醜陋的方式,但至少它的工作原理:我有一個這樣的accumulableCollection:

val userID = ssc.sparkContext.accumulableCollection(new mutable.HashMap[String,Long]()) 

然後我創建了一個「假」事件,並把它推到我的火花流媒體上下文如下:

val rddQueue = new mutable.SynchronizedQueue[RDD[String]]() 
for (i <- 1 to 100) { 
    rddQueue += ssc.sparkContext.makeRDD(Seq("FAKE_MESSAGE")) 
    Thread.sleep(3000) 
} 
val inputStream = ssc.queueStream(rddQueue) 

inputStream.foreachRDD(UPDATE_MY_ACCUMULATOR) 

這將讓我訪問我的accumulatorCollection並更新所有用戶ID的所有計數器。截至目前一切正常,但是當我改變我的循環來源:

for (i <- 1 to 100) {} #This is for test 

要:

while (true) {} #This is to let me access and update my accumulator through the whole application life cycle 

後來,當我跑我的./spark-submit,我的應用程序卡住在這個舞臺上:

15/12/10 18:09:00 INFO BlockManagerMasterActor: Registering block manager slave1.cluster.example:38959 with 1060.3 MB RAM, BlockManagerId(1, slave1.cluster.example, 38959) 

任何有關如何解決這個問題的線索?有沒有一種非常簡單的方法可以讓我更新我的用戶標識的值(而不是創建一個無用的RDD並定期將它推送到queuestream)?

回答

3

while (true) ...版本不起作用的原因是控件永遠不會返回到主執行線,因此該行下面的任何內容都不會執行。爲了解決這個特定的問題,我們應該在一個單獨的線程中執行while循環。 Future { while() ...}應該可能工作。 此外,在上面的示例中填充QueueDStream時不需要使用Thread.sleep(3000)。 Spark Streaming將在每個流式傳輸間隔中從隊列中消耗一條消息。

觸發「滴答」消息流入的更好方法是使用在每個流式間隔播放相同RDD的ConstantInputDStream,因此不需要使用QueueDStream創建RDD流入。

這就是說,它看起來目前的做法似乎很脆弱,需要修訂。