2016-10-17 135 views
0

我試圖爲多個RabbitMQ隊列設置Spark流。如下所述,我設置了2個工作者,每個工作者都有一個內核和2GB的內存。所以,問題是當我保持這個參數爲conf.set("spark.cores.max","2")流不處理任何數據,只是繼續添加作業。但是,一旦我將其設置爲conf.set("spark.cores.max","3")流式處理開始處理它。所以,我無法理解這個原因。另外,如果我想從兩個隊列中並行處理數據,我應該怎麼做。我在下面提到了我的代碼和配置設置。並行處理多個rabbitmq隊列的Spark Streaming處理

Spark-env.sh:

SPARK_WORKER_MEMORY=2g 
SPARK_WORKER_INSTANCES=1 
SPARK_WORKER_CORES=1 

Scala代碼:

val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2","queueName" -> config.getString("queueName"),"host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName"), "routingKeys" -> config.getString("routingKeys")) 
    val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams) 
    receiverStream.start()  

    val predRabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2", "queueName" -> config.getString("queueName1"), "host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName1"), "routingKeys" -> config.getString("routingKeys1")) 
    val predReceiverStream = RabbitMQUtils.createStream(ssc, predRabbitParams) 
    predReceiverStream.start() 

回答

1

這種行爲在Streaming Guide解釋。每個接收器都是一個長時間運行的進程,佔用一個線程。

如果可用線程的數量小於或等於接收器的數量有剩餘任務處理沒有資源:

分配給星火流媒體應用內核的數量必須比多接收器數量。否則系統將接收數據,但無法處理它。