我想從Kafka讀取數據並通過Spark RDD存儲到Cassandra表中。值分裂不是(String,String)的成員
獲取錯誤,而編譯代碼:
/root/cassandra-count/src/main/scala/KafkaSparkCassandra.scala:69: value split is not a member of (String, String)
[error] val lines = messages.flatMap(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
下面的代碼:當我通過互動spark-shell
手動運行該代碼,它工作正常,但而錯誤編譯代碼來。
// Create direct kafka stream with brokers and topics
val topicsSet = Set[String] (kafka_topic)
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafka_broker)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
// Create the processing logic
// Get the lines, split
val lines = messages.map(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
lines.saveToCassandra("stream_poc", "US_city", SomeColumns("city_name", "jan_temp", "lat", "long"))
@RameshMaharjan:請不要格式化專有名詞作爲代碼。卡夫卡和卡桑德拉只需要一個初始資金,就是這樣 - 他們本身並不是代碼。然而,像'spark-shell'這樣的東西都可以,因爲代碼格式適合於控制檯I/O(假定'spark-shell'是一個鍵入的命令)。 – halfer