1
我有一個卡夫卡生產者,其從目錄中讀取和KafkaUtils.createDirectStream一個String對象星火
def main(args: Array[String]) {
val Array(brokers, topic, messagesPerSec, wordsPerMessage) = Array("quickstart.cloudera:9092", "test","10","10")
val directoryPath = "/home/cloudera/Documents/config/"
// Zookeeper connection properties
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val myDirectory= new File(directoryPath)
var lines =""
for (file <- myDirectory.listFiles) {
lines = scala.io.Source.fromFile(file).mkString
val message = new ProducerRecord[String, String](topic, null, lines)
producer.send(message)
print(lines)
Thread.sleep(1000)
}
同樣我使用的火花直接流爲我的消費文件的內容寫入到主題
val lines = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](ssc, kafkaConf, Set(topic)).map(_._2)
val str = lines.print(10)
我可以打印文件的內容。 我正在使用單個主題。 我必須從此DStream中獲取RDD,並將整個內容轉換爲字符串對象,以便將其傳遞給方法。 有人可以幫忙嗎?