2016-03-03 34 views
3

背景: 我使用Spark Streaming從Kafka流式處理以逗號分隔鍵值對形式出現的事件 以下是事件流入我的火花應用程序。Spark Streaming - 根據按鍵分組的鍵值對計算統計信息

Key1=Value1, Key2=Value2, Key3=Value3, Key4=Value4,responseTime=200 
Key1=Value5, Key2=Value6, Key3=Value7, Key4=Value8,responseTime=150 
Key1=Value9, Key2=Value10, Key3=Value11, Key4=Value12,responseTime=100 

輸出

我想要計算不同的度量(平均,計數等)由不同的密鑰流中分組的對於給定批次的時間間隔例如

  1. 通過密鑰1,密鑰2(RESPONSETIME是在每一個事件的關鍵之一)
  2. 計數通過密鑰1,密鑰2

我嘗試到目前爲止平均RESPONSETIME:

val stream = KafkaUtils 
    .createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet) 

val pStream = stream.persist() 

val events: DStream[String] = pStream.flatMap(_._2.split(",")) 
val pairs= events.map(data => data.split("=")).map(array => (array(0), array(1))) 
// pairs results in tuples of (Key1, Value1), (Key2, Value2) and so on. 

更新 - 03/04 鑰匙Key1,Key2 ...可能在傳入流中亂序出現。

欣賞您的輸入/提示。

+0

您能否提供預期的輸出類型和您迄今爲止的嘗試?這不是你想要的東西。 – zero323

+0

@ zero323 - 好吧,我用期望的輸出類型和我迄今爲止的嘗試更新了我的問題。如果我能夠解決這個問題,我會發布答案。 – codehammer

+1

謝謝。我對這一點進行了格式化。 – zero323

回答

2

一個可行的辦法是這樣的:

  • 創建代表每個記錄的情況下類,所以我們沒有處理的元組:

    case class Record(
        key1: String, key2: String, key3: String, key4: String, rt: Double) 
    
  • 使用正則表達式解析記錄,丟棄殘缺的條目:

    import scala.util.matching.Regex 
    
    val recordPattern = new Regex(
        "^Key1=(.*?), ?Key2=(.*?), ?Key3=(.*?), ?Key4=(.*?), ?" ++ 
        "responseTime=(0-9+)$" 
    ) 
    
    val records = pStream.map { 
        case recordPattern(key1, key2, key3, key4, rt) => 
        Some(Record(key1, key2, key3, key4, rt.toDouble)) 
        case _ => None 
    }.flatMap(x => x) // Drop malformed 
    
  • 重塑數據鍵值對:

    val pairs = records.map(r => ((r.key1, r.key2), r.rt)) 
    
  • 創建一個分區,並使用StatCounter來彙總統計:

    import org.apache.spark.util.StatCounter 
    import org.apache.spark.HashPartitioner 
    
    val paritioner: HashPartitioner = ??? 
    
    pairs.combineByKey[StatCounter](
        StatCounter(_), _ merge _, _ merge _, paritioner 
    ) 
    
  • 的興趣提取物領域:

    stats.mapValues(s => (s.count, s.mean)) 
    

您也可以嘗試像這樣的無序數據,雖然我強烈建議修復th在上游:

val kvPattern = "(\\w+)=(\\w+)".r 
val pairs = pStream.map(line => { 
    val kvs = kvPattern.findAllMatchIn(line) 
    .map(m => (m.group(1), m.group(2))).toMap 

    // This will discard any malformed lines 
    // (lack of key1, key2, lack or invalid format of responseTime) 
    Try((
    (kvs("Key1"), kvs("Key2")), 
    kvs("responseTime").toDouble 
)) 

}).flatMap(_.toOption) 

並且按照以前那樣繼續。

+0

謝謝+1。這可能工作。唯一的警告,我可能忘記在我的問題中提到的是key1,key2可能不一定每次都在相同的序列中,所以正則表達式不匹配。我正在考慮應用更通用的解決方案,因此需要通過關鍵值元組來查找配置的key1,key2。想法? – codehammer

+0

使用StatsCounter也很棒。 – codehammer

+1

然後,你必須解析這個其他的方式。將鍵值對提取到「Map」中應該沒問題。 – zero323