0
我試圖爲多個RabbitMQ隊列設置Spark流。如下所述,我設置了2個工作者,每個工作者都有一個內核和2GB的內存。所以,問題是當我保持這個參數爲conf.set("spark.cores.max","2")
流不處理任何數據,只是繼續添加作業。但是,一旦我將其設置爲conf.set("spark.cores.max","3")
流式處理開始處理它。所以,我無法理解這個原因。另外,如果我想從兩個隊列中並行處理數據,我應該怎麼做。我在下面提到了我的代碼和配置設置。並行處理多個rabbitmq隊列的Spark Streaming處理
Spark-env.sh:
SPARK_WORKER_MEMORY=2g
SPARK_WORKER_INSTANCES=1
SPARK_WORKER_CORES=1
Scala代碼:
val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2","queueName" -> config.getString("queueName"),"host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName"), "routingKeys" -> config.getString("routingKeys"))
val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams)
receiverStream.start()
val predRabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2", "queueName" -> config.getString("queueName1"), "host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName1"), "routingKeys" -> config.getString("routingKeys1"))
val predReceiverStream = RabbitMQUtils.createStream(ssc, predRabbitParams)
predReceiverStream.start()