2016-08-22 57 views
1

我正在將消息從Message Hub傳輸到Sparkmix中的Spark實例。我正在使用java客戶端向Message Hub發送簡單的json消息。Bluemix上的Spark上下文向JSON負載添加null

JSON消息 -

{"country":"Netherlands","dma_code":"0","timezone":"Europe\/Amsterdam","area_code":"0","ip":"46.19.37.108","asn":"AS196752","continent_code":"EU","isp":"Tilaa V.O.F.","longitude":5.75,"latitude":52.5,"country_code":"NL","country_code3":"NLD"} 

當我在星火開始流,我收到消息的開頭有一個額外空。

(null,{"country":"Netherlands","dma_code":"0","timezone":"Europe\/Amsterdam","area_code":"0","ip":"46.19.37.108","asn":"AS196752","continent_code":"EU","isp":"Tilaa V.O.F.","longitude":5.75,"latitude":52.5,"country_code":"NL","country_code3":"NLD"}) 

請讓我知道爲什麼Spark環境將這個null放在前面。我怎樣才能刪除它?

KafkaSender碼 -

KafkaProducer<String, String> kafkaProducer; 
    kafkaProducer = new KafkaProducer<String, String>(props); 
    ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message); 

    RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get(); 
    //getting RecordMetadata is possible to validate topic, partition and offset 
    System.out.println("topic where message is published : " + recordMetadata.topic()); 
    System.out.println("partition where message is published : " + recordMetadata.partition()); 
    System.out.println("message offset # : " + recordMetadata.offset()); 
    kafkaProducer.close(); 

感謝 拉吉

+0

您的問題似乎與此類似 http://stackoverflow.com/questions/36888224/null-value-in-spark-streaming-from-kafka 你的關鍵是缺少這樣的消息中樞(卡夫卡)爲你添加null,我懷疑spark的上下文增加了這一點。 –

回答

0

你的關鍵是空 - 第一個值是你的鑰匙,第二個是你的課程價值。

我建議你發佈將消息發佈到Kafka/MessageHub的代碼以獲得更好的答案。

解決你的問題 - 如果你的目標只是打印出來,你可以做這樣的事情,而不是打印數據到標準輸出並忽略null鍵。

stream.foreachRDD(recordRDD => { 
    recordRDD.foreach(record => print(record._2)) 
}) 
+0

在我的問題中添加了kafka發件人代碼。 – rchaudhary123

+0

再一次,有問題的生產者代碼沒有發送密鑰。我不確定你的問題是什麼?你得到了一個String,String的元組。只需使用元組的_2來獲得你的值。 – zpp

相關問題