我正在將消息從Message Hub傳輸到Sparkmix中的Spark實例。我正在使用java客戶端向Message Hub發送簡單的json消息。Bluemix上的Spark上下文向JSON負載添加null
JSON消息 -
{"country":"Netherlands","dma_code":"0","timezone":"Europe\/Amsterdam","area_code":"0","ip":"46.19.37.108","asn":"AS196752","continent_code":"EU","isp":"Tilaa V.O.F.","longitude":5.75,"latitude":52.5,"country_code":"NL","country_code3":"NLD"}
當我在星火開始流,我收到消息的開頭有一個額外空。
(null,{"country":"Netherlands","dma_code":"0","timezone":"Europe\/Amsterdam","area_code":"0","ip":"46.19.37.108","asn":"AS196752","continent_code":"EU","isp":"Tilaa V.O.F.","longitude":5.75,"latitude":52.5,"country_code":"NL","country_code3":"NLD"})
請讓我知道爲什麼Spark環境將這個null放在前面。我怎樣才能刪除它?
KafkaSender碼 -
KafkaProducer<String, String> kafkaProducer;
kafkaProducer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message);
RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
//getting RecordMetadata is possible to validate topic, partition and offset
System.out.println("topic where message is published : " + recordMetadata.topic());
System.out.println("partition where message is published : " + recordMetadata.partition());
System.out.println("message offset # : " + recordMetadata.offset());
kafkaProducer.close();
感謝 拉吉
您的問題似乎與此類似 http://stackoverflow.com/questions/36888224/null-value-in-spark-streaming-from-kafka 你的關鍵是缺少這樣的消息中樞(卡夫卡)爲你添加null,我懷疑spark的上下文增加了這一點。 –