2016-02-04 104 views
2

我有一個用Scala編寫的spark工作,我只想寫一行用逗號分隔的行,來自Kafka生產者到Cassandra數據庫。 但我無法調用saveToCassandra。 我看到幾個wordcount的例子,他們正在寫地圖結構到兩列的卡桑德拉表,它似乎工作正常。但是我有很多列,我發現數據結構需要並行化。 這裏是我的代碼示例:使用spark將數據寫入cassandra

object TestPushToCassandra extends SparkStreamingJob { 
def validate(ssc: StreamingContext, config: Config): SparkJobValidation = SparkJobValid 

def runJob(ssc: StreamingContext, config: Config): Any = { 

val bp_conf=BpHooksUtils.getSparkConf() 
val brokers=bp_conf.get("bp_kafka_brokers","unknown_default") 


val input_topics = config.getString("topics.in").split(",").toSet 


val output_topic = config.getString("topic.out") 


val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, input_topics) 


val lines = messages.map(_._2) 
val words = lines.flatMap(_.split(",")) 

val li = words.par 

li.saveToCassandra("testspark","table1", SomeColumns("col1","col2","col3")) 
li.print() 



words.foreachRDD(rdd => 
    rdd.foreachPartition(partition => 
    partition.foreach{ 
     case x:String=>{ 

     val props = new HashMap[String, Object]() 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.StringSerializer") 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.StringSerializer") 

     val outMsg=x+" from spark" 
     val producer = new KafkaProducer[String,String](props) 
     val message=new ProducerRecord[String, String](output_topic,null,outMsg) 
     producer.send(message) 
     } 
    } 


) 
) 


ssc.start() 
ssc.awaitTermination() 
} 
} 

我認爲這是斯卡拉的是,我沒有得到正確的語法。 在此先感謝。

+1

words.par的調用幾乎肯定不是正確的做法。 Dstream的「詞彙」已經是一個DStream,它的本質已經被分佈和並行化了。沒有這個,你有什麼問題? – RussS

+0

它工作沒有「.par」,但現在我想知道如何分割值提取col1,col2,col3的值?例如,如果在kafka生產者中寫入「val1,val2,val3」,那麼我如何提取這些值並分別存儲在col1,col2和col3中? – user3925365

+0

你是說你不能.split(「,」)字符串? – RussS

回答

1

您需要將單詞DStream更改爲連接器可以處理的內容。

如元組

val words = lines 
    .map(_.split(",")) 
    .map(wordArr => (wordArr(0), wordArr(1), wordArr(2)) 

或案例類

case class YourRow(col1: String, col2: String, col3: String) 
val words = lines 
    .map(_.split(",")) 
    .map(wordArr => YourRow(wordArr(0), wordArr(1), wordArr(2))) 

或CassandraRow

這是因爲如果你把一個陣中還有所有的本身就可以在一個陣列C *你試圖插入而不是3列。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md

+0

感謝您的回答。當我嘗試解決方案時,它在數據庫中的存儲位有所不同。也許我錯過了一些小的語法。如果我通過abc,def,ghi,並且這裏是我的代碼,則從Kafka生產商處獲得:val lines = messages.map(_._ 2) val words = lines.flatMap(_。split(「」)) val innerWords = words.flatMap (_.split(「,」)) val wordCounts = innerWords.map(wordArr =(wordArr(0),wordArr(1),wordArr(2))) wordCounts.saveToCassandra(「keyspace01」,「table1」 ,SomeColumns(「col1」,「col2」,「col3」)) 這段代碼在數據庫中產生三個條目,即1st:a,b,c 2nd:d,e,f 3rd:g,h,i – user3925365

+0

woops我不應該複製你的線條,這應該是地圖,而不是flatMap地圖 – RussS

+0

如果我使用地圖與詞首先分裂,然後它給了我編譯錯誤words.foreachRDD函數在行「case x :串」。它說監察人員與模式類型不相容; found:字符串 必需:數組[String] – user3925365

相關問題