如何在kafka中的所有分區中生成火花寫入消息,以便我可以使用directstream並提高流的性能。當我使用火花流將消息寫入kafka主題時,它只寫入一個分區
這裏是我的代碼: -
object kafka {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("FlightawareSparkApp")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val lines = ssc.socketTextStream("localhost", 18436)
val topic = "test"
val props = new java.util.Properties()
props.put("metadata.broker.list", "list")
props.put("bootstrap.servers", "list")
// props.put("bootstrap.servers", "localhost:9092")
// props.put("bootstrap.servers", "localhost:9092")
props.put("client.id", "KafkaProducer")
props.put("producer.type", "async")
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
lines.foreachRDD(rdd => {
rdd.foreachPartition(part => {
val producer = new KafkaProducer[Integer, String](props)
part.foreach(msg =>{
val record = new ProducerRecord[Integer, String](topic, msg)
producer.send(record)
})
producer.close()
})
})
ssc.start()
ssc.awaitTermination()
}
}
這段代碼是信息推送到卡夫卡的話題,但是當我看到使用
/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKABROKERS --topic test --time -1
我得到輸出的計數在那裏我可以僅在一個分區中看到消息。
test:8:0
test:2:0
test:5:0
test:4:0
test:7:0
test:1:0
test:9:0
test:3:0
test:6:237629
test:0:0
有關如何將數據拆分到所有分區的任何建議。
如何在程序中默認實現分區鍵,以便跨分區分佈消息。
謝謝,
Ankush雷迪。
謝謝你的回覆。如果我在這裏傳遞任何隨機密鑰,那麼它將工作。 val record = new ProducerRecord [Integer,String](topic,key,8,msg)。 –