0

我試圖過濾數據流,並根據ID列的值,我想將數據保存到不同的表星火流過濾的流媒體數據

我有兩個表

  1. testTable_odd(ID,數據1,數據2)
  2. testTable_even(ID,數據1)

如果id的值是奇數的話,我想記錄保存到testTable_odd表,如果值爲偶數則我想將記錄保存到testTable_even。

這裏棘手的部分是我的兩個表有不同的列。嘗試了多種方式,認爲Scala函數的返回類型是[obj1,obj2],但我無法成功,任何指針都將不勝感激。

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.sql.SaveMode 
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.kafka.KafkaUtils 
import com.datastax.spark.connector._ 

import kafka.serializer.StringDecoder 
import org.apache.spark.rdd.RDD 
import com.datastax.spark.connector.SomeColumns 
import java.util.Formatter.DateTime 

object StreamProcessor extends Serializable { 
    def main(args: Array[String]): Unit = { 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamProcessor") 
     .set("spark.cassandra.connection.host", "127.0.0.1") 

    val sc = new SparkContext(sparkConf) 

    val ssc = new StreamingContext(sc, Seconds(2)) 

    val sqlContext = new SQLContext(sc) 

    val kafkaParams = Map("metadata.broker.list" -> "localhost:9092") 

    val topics = args.toSet 

    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topics) 


     stream 
    .map { 
    case (_, msg) => 
     val result = msgParseMaster(msg) 
     (result.id, result.data) 
    }.foreachRDD(rdd => if (!rdd.isEmpty)  rdd.saveToCassandra("testKS","testTable",SomeColumns("id","data"))) 

     } 
    } 

    ssc.start() 
    ssc.awaitTermination() 

    } 

    import org.json4s._ 
    import org.json4s.native.JsonMethods._ 
    case class wordCount(id: Long, data1: String, data2: String) extends serializable 
    implicit val formats = DefaultFormats 
    def msgParseMaster(msg: String): wordCount = { 
    val m = parse(msg).extract[wordCount] 
    return m 

    } 

} 

回答

0

我已經執行了下面的步驟。 1)從原始的JSON字符串中提取細節並且與案例類別 2)創建超級JSON(具有兩個過濾標準所需的細節) 3)將該JSON轉換成數據幀 4)執行select和where子句那JSON 5)保存到Cassandra

1

我認爲你只是想使用兩次過濾功能。你可以不喜歡

val evenstream = stream.map { case (_, msg) => 
    val result = msgParseMaster(msg) 
    (result.id, result.data) 
}.filter{ k => 
    k._1 % 2 == 0 
} 

evenstream.foreachRDD{rdd=> 
    //Do something with even stream 
} 

val oddstream = stream.map { case (_, msg) => 
    val result = msgParseMaster(msg) 
    (result.id, result.data) 
}.filter{ k => 
    k._1 % 2 == 1 
} 

oddstream.foreachRDD{rdd=> 
    //Do something with odd stream 
} 

的東西時,我做了一些類似的一個項目here我兩次使用的過濾器功能,如果你往下看近線191.我是分類和基於他們之間的價值保存的元組0和1,所以隨時檢查出來。

+0

謝謝。我用一次過濾以不同的方式解決了這個問題。 – Suresh

+0

嘿,你介意發佈你的解決方案,所以你可以把它標記爲正確的答案?我也想看到它:P –

+0

今天肯定會這樣做。只是列出了過程。我已經完成了以下步驟。 1)從原始JSON字符串和案例類中提取細節2)創建超級JSON(具有兩個過濾條件所需的細節)3)將該JSON轉換爲DataFrame 4)在該JSON上執行select和where子句5)保存到cassandra – Suresh