我試圖過濾數據流,並根據ID列的值,我想將數據保存到不同的表星火流過濾的流媒體數據
我有兩個表
- testTable_odd(ID,數據1,數據2)
- testTable_even(ID,數據1)
如果id的值是奇數的話,我想記錄保存到testTable_odd表,如果值爲偶數則我想將記錄保存到testTable_even。
這裏棘手的部分是我的兩個表有不同的列。嘗試了多種方式,認爲Scala函數的返回類型是[obj1,obj2],但我無法成功,任何指針都將不勝感激。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import com.datastax.spark.connector._
import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.SomeColumns
import java.util.Formatter.DateTime
object StreamProcessor extends Serializable {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamProcessor")
.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = args.toSet
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
stream
.map {
case (_, msg) =>
val result = msgParseMaster(msg)
(result.id, result.data)
}.foreachRDD(rdd => if (!rdd.isEmpty) rdd.saveToCassandra("testKS","testTable",SomeColumns("id","data")))
}
}
ssc.start()
ssc.awaitTermination()
}
import org.json4s._
import org.json4s.native.JsonMethods._
case class wordCount(id: Long, data1: String, data2: String) extends serializable
implicit val formats = DefaultFormats
def msgParseMaster(msg: String): wordCount = {
val m = parse(msg).extract[wordCount]
return m
}
}
謝謝。我用一次過濾以不同的方式解決了這個問題。 – Suresh
嘿,你介意發佈你的解決方案,所以你可以把它標記爲正確的答案?我也想看到它:P –
今天肯定會這樣做。只是列出了過程。我已經完成了以下步驟。 1)從原始JSON字符串和案例類中提取細節2)創建超級JSON(具有兩個過濾條件所需的細節)3)將該JSON轉換爲DataFrame 4)在該JSON上執行select和where子句5)保存到cassandra – Suresh