我試圖在一個名爲「test」的kafka主題上發送一個字數問題(在spark-scala中)的輸出。看到下面的代碼:Kafka producer.send()被producer.close()停止
val Dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
val lines = Dstream.map(f => f._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD(
rdd => rdd.foreach(
f =>
{
val sendProps = new Properties()
sendProps.put("metadata.broker.list", brokers)
sendProps.put("serializer.class", "kafka.serializer.StringEncoder")
sendProps.put("producer.type", "async")
val config = new ProducerConfig(sendProps)
val producer = new Producer[String, String](config)
producer.send(new KeyedMessage[String, String]"test", f._1 + " " +f._2))
producer.close();
}))
問題是一些單詞隨機丟失輸出。我還注意到,如果我刪除了語句
producer.close()
沒有數據丟失。
這是否意味着producer.close()中斷producer.send()之前它其實是把數據緩衝區,由於其特定的元組沒有被髮送到消費者?如果是,我應該如何關閉生產者而不會冒數據丟失的風險?
以上是我最初的問題,Vale的答案解決了這個問題。
現在,當我更改producer.type屬性 - 數據隨機丟失。
sendProps.put("producer.type", "sync")
爲了澄清producer.send運行所有我需要把在輸出主題的話。但是,有些單詞會丟失,並且不會顯示在輸出Kafka主題中。
我實際上是從一個kafka流中讀取數據。是的,每個元組都會創建一個新的製作人。請在看到此內容後告訴我該做什麼。 –
我知道你在使用DStreams。我還不能用scala編寫。如果你看看我的這個鏈接有一個「如何使用foreachRDD」,這表明你這種結構: 'dstream.foreachRDD | -rdd.foreachPartition | - 創建新的生產者,從分區獲取的東西和發送它' – Vale
謝謝淡水河谷。該解決方案工作得很好。我一直在敲我的頭腦超過一天。你救了我的靈魂。 –