2016-05-27 228 views
0

我試圖在一個名爲「test」的kafka主題上發送一個字數問題(在spark-scala中)的輸出。看到下面的代碼:Kafka producer.send()被producer.close()停止

val Dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 

val lines = Dstream.map(f => f._2) 
val words = lines.flatMap(_.split(" ")) 
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) 

wordCounts.foreachRDD(
     rdd => rdd.foreach(
     f => 
      { 
      val sendProps = new Properties() 
      sendProps.put("metadata.broker.list", brokers) 
      sendProps.put("serializer.class", "kafka.serializer.StringEncoder") 
      sendProps.put("producer.type", "async") 

      val config = new ProducerConfig(sendProps) 
      val producer = new Producer[String, String](config) 
      producer.send(new KeyedMessage[String, String]"test", f._1 + " " +f._2)) 
      producer.close(); 

      })) 

問題是一些單詞隨機丟失輸出。我還注意到,如果我刪除了語句

producer.close() 

沒有數據丟失。

這是否意味着producer.close()中斷producer.send()之前它其實是把數據緩衝區,由於其特定的元組沒有被髮送到消費者?如果是,我應該如何關閉生產者而不會冒數據丟失的風險?

以上是我最初的問題,Vale的答案解決了這個問題。

現在,當我更改producer.type屬性 - 數據隨機丟失。

sendProps.put("producer.type", "sync") 

爲了澄清producer.send運行所有我需要把在輸出主題的話。但是,有些單詞會丟失,並且不會顯示在輸出Kafka主題中。

回答

1

這很奇怪。 close()方法應該等待send完成,這就是爲什麼引入close(time)方法的原因:as you can see here
因此,我使用Java 7. rdd.foreach是否在其中的每個分區上運行?或者它在每個元組上運行(我認爲它正在執行)?
如果後者,你可以嘗試一個rdd.foreachPartition(refer to this)?因爲你正在爲每一行創建一個製作人,而且我擔心這可能會導致問題(儘管理論上它不應該)。

+0

我實際上是從一個kafka流中讀取數據。是的,每個元組都會創建一個新的製作人。請在看到此內容後告訴我該做什麼。 –

+1

我知道你在使用DStreams。我還不能用scala編寫。如果你看看我的這個鏈接有一個「如何使用foreachRDD」,這表明你這種結構: 'dstream.foreachRDD | -rdd.foreachPartition | - 創建新的生產者,從分區獲取的東西和發送它' – Vale

+0

謝謝淡水河谷。該解決方案工作得很好。我一直在敲我的頭腦超過一天。你救了我的靈魂。 –