我有一個用Scala編寫的spark工作,我只想寫一行用逗號分隔的行,來自Kafka生產者到Cassandra數據庫。 但我無法調用saveToCassandra。 我看到幾個wordcount的例子,他們正在寫地圖結構到兩列的卡桑德拉表,它似乎工作正常。但是我有很多列,我發現數據結構需要並行化。 這裏是我的代碼示例:使用spark將數據寫入cassandra
object TestPushToCassandra extends SparkStreamingJob {
def validate(ssc: StreamingContext, config: Config): SparkJobValidation = SparkJobValid
def runJob(ssc: StreamingContext, config: Config): Any = {
val bp_conf=BpHooksUtils.getSparkConf()
val brokers=bp_conf.get("bp_kafka_brokers","unknown_default")
val input_topics = config.getString("topics.in").split(",").toSet
val output_topic = config.getString("topic.out")
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, input_topics)
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(","))
val li = words.par
li.saveToCassandra("testspark","table1", SomeColumns("col1","col2","col3"))
li.print()
words.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach{
case x:String=>{
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val outMsg=x+" from spark"
val producer = new KafkaProducer[String,String](props)
val message=new ProducerRecord[String, String](output_topic,null,outMsg)
producer.send(message)
}
}
)
)
ssc.start()
ssc.awaitTermination()
}
}
我認爲這是斯卡拉的是,我沒有得到正確的語法。 在此先感謝。
words.par的調用幾乎肯定不是正確的做法。 Dstream的「詞彙」已經是一個DStream,它的本質已經被分佈和並行化了。沒有這個,你有什麼問題? – RussS
它工作沒有「.par」,但現在我想知道如何分割值提取col1,col2,col3的值?例如,如果在kafka生產者中寫入「val1,val2,val3」,那麼我如何提取這些值並分別存儲在col1,col2和col3中? – user3925365
你是說你不能.split(「,」)字符串? – RussS