2014-09-11 36 views
3

我正在嘗試使用自定義接收器編寫Spark Streaming應用程序。我應該通過提供具有預定義間隔的隨機值來模擬實時輸入數據。 (簡化)接收器如下所示,與根據星火流應用程序下面的代碼:自定義接收器在Spark Streaming中排除工作

class SparkStreamingReceiver extends Actor with ActorHelper { 

    private val random = new Random() 

    override def preStart = { 
    context.system.scheduler.schedule(500 milliseconds, 1000 milliseconds)({ 
     self ! ("string", random.nextGaussian()) 
    }) 
    } 

    override def receive = { 
    case data: (String, Double) => { 
     store[(String, Double)](data) 
    } 
    } 
} 
val conf: SparkConf = new SparkConf() 
conf.setAppName("Spark Streaming App") 
    .setMaster("local") 

val ssc: StreamingContext = new StreamingContext(conf, Seconds(2)) 

val randomValues: ReceiverInputDStream[(String, Double)] = 
    ssc.actorStream[(String,Double)](Props(new SparkStreamingReceiver()), "Receiver") 

randomValues.saveAsTextFiles("<<OUTPUT_PATH>>/randomValues") 

運行這段代碼,我看到接收器正在工作(存儲項目接受單日誌項)。但是,saveAsTextFiles將永遠不會輸出值。

我可以通過更改主線程運行兩個線程(local[2])來解決問題,但是如果我註冊了另一個接收器實例(我打算這麼做),它會重新出現。更具體地說,我需要至少有一個線程超過我的自定義接收器的數量才能獲得任何輸出。

在我看來,好像工作線程被接收器阻塞了。

任何人都可以解釋這種效應,並可能如何解決我的代碼?

回答

7

每個接收器都使用一個計算插槽。所以2個接收器需要2個計算插槽。如果所有的計算時隙都由接收器獲取,則沒有剩餘時隙來處理數據。這就是爲什麼具有1個接收器的「本地」模式和具有2個接收器的「本地[2]」阻止處理。

+1

我想我明白你的意思,但實際上,你的意思是說當地的[2]救援處理。對 ? – jayunit100 2014-10-25 21:11:33

+1

另外,當我們在接收器中啓動一個新的java線程時,是否需要一個計算插槽?計算插槽如何在本地模式下標記爲java線程映射.... – jayunit100 2014-10-25 21:14:07