0

如何在kafka中的所有分區中生成火花寫入消息,以便我可以使用directstream並提高流的性能。當我使用火花流將消息寫入kafka主題時,它只寫入一個分區

這裏是我的代碼: -

object kafka { 
def main(args: Array[String]) { 
    val sparkConf = new SparkConf().setAppName("FlightawareSparkApp") 
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    val ssc = new StreamingContext(sparkConf, Seconds(3)) 
    val lines = ssc.socketTextStream("localhost", 18436) 
         val topic = "test" 
         val props = new java.util.Properties() 
         props.put("metadata.broker.list", "list") 
         props.put("bootstrap.servers", "list") 
         // props.put("bootstrap.servers", "localhost:9092") 
         // props.put("bootstrap.servers", "localhost:9092") 
         props.put("client.id", "KafkaProducer") 
         props.put("producer.type", "async") 
         props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer") 
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
      lines.foreachRDD(rdd => { 
       rdd.foreachPartition(part => { 
        val producer = new KafkaProducer[Integer, String](props) 
        part.foreach(msg =>{ 
         val record = new ProducerRecord[Integer, String](topic, msg) 
         producer.send(record) 
        }) 
        producer.close() 
       }) 
      }) 
      ssc.start() 
      ssc.awaitTermination() 
} 

}

這段代碼是信息推送到卡夫卡的話題,但是當我看到使用

/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKABROKERS --topic test --time -1 

我得到輸出的計數在那裏我可以僅在一個分區中看到消息。

test:8:0 
test:2:0 
test:5:0 
test:4:0 
test:7:0 
test:1:0 
test:9:0 
test:3:0 
test:6:237629 
test:0:0 

有關如何將數據拆分到所有分區的任何建議。

如何在程序中默認實現分區鍵,以便跨分區分佈消息。

謝謝,

Ankush雷迪。

回答

2

這是因爲你沒有設置密鑰。您可以在Kafka FAQ [1]中找到以下詳細信息。

爲什麼分區鍵未指定時數據在分區間不均勻分佈?

在Kafka生產者中,可以指定一個分區鍵來表示消息的目標分區。默認情況下,基於散列的分區程序用於確定給定密鑰的分區標識,人們也可以使用自定義分區程序。

要減少開放套接字的數量,在0.8.0(https://issues.apache.org/jira/browse/KAFKA-1017)中,當未指定分區鍵或爲空時,製作者將選擇一個隨機分區並在此之前堅持一段時間(默認爲10分鐘)切換到另一個。因此,如果生產者數量少於分區數量,那麼在某個特定時間點,某些分區可能不會收到任何數據。爲了緩解這個問題,可以減少元數據刷新間隔,或者指定消息密鑰和定製的隨機分區。欲瞭解更多詳情請參閱本主題http://mail-archives.apache.org/mod_mbox/kafka-dev/201310.mbox/%3CCAFbh0Q0aVh%2Bvqxfy7H-%2BMnRFBt6BnyoZk1LWBoMspwSmTqUKMg%40mail.gmail.com%3E

[1] https://cwiki.apache.org/confluence/display/KAFKA/FAQ

+0

謝謝你的回覆。如果我在這裏傳遞任何隨機密鑰,那麼它將工作。 val record = new ProducerRecord [Integer,String](topic,key,8,msg)。 –

相關問題