2017-09-27 96 views
0

我從火花流中獲取數百萬來自卡夫卡流的消息。有15種不同類型的消息。消息來自單個主題。我只能通過其內容區分消息。所以我使用rdd.contains方法來獲取不同類型的rdd。是火花斯卡拉rdd.contains功能昂貴

示例消息

{ 「一個」: 「foo」 的, 「B」: 「棒」, 「類型」: 「第一」 .......}
{ 「一個」:」 {「a」:「foo2」,「b」:「bar2」,「type」:「foo1」,「b」:「bar1」,「type」:「second」.......
「第三個「.......}
{」a「:」foo「,」b「:」bar「,」type「:」first「.......}
.... ..........
...............
.........

代碼

DStream.foreachRDD { rdd => 
    if (!rdd.isEmpty()) { 
    val rdd_first = rdd.filter { 
     ele => ele.contains("First") 
    } 
    if (!rdd_first.isEmpty()) { 
     insertIntoTableFirst(hivecontext.read.json(rdd_first)) 
    } 
    val rdd_second = rdd.filter { 
     ele => ele.contains("Second") 
    } 
    if (!rdd_second.isEmpty()) { 
    insertIntoTableSecond(hivecontext.read.json(rdd_second)) 
    } 
     ............. 
     ...... 
    same way for 15 different rdd 

有沒有什麼辦法讓卡夫卡主題消息不同RDD?

回答

1

沒有rdd.contains。此處使用的函數contains適用於RDD中的String

喜歡這裏:

val rdd_first = rdd.filter { 
    element => element.contains("First") // each `element` is a String 
} 

這種方法並不十分可靠,因爲在字符串其他內容可能遇到的比較,產生的誤差。

例如

{"a":"foo", "b":"bar","type":"second", "c": "first", .......} 

一個處理這種方式是首先將JSON數據轉換成適當的記錄,再申請對這些記錄進行分組或過濾邏輯。爲此,我們首先需要數據的模式定義。有了模式,我們可以分析記錄成JSON和最重要的是將任何處理:

case class Record(a:String, b:String, `type`:String) 

import org.apache.spark.sql.types._ 
val schema = StructType(
       Array(
       StructField("a", StringType, true), 
       StructField("b", StringType, true), 
       StructField("type", String, true) 
       ) 
      ) 

val processPerType: Map[String, Dataset[Record] => Unit ] = Map(...) 

stream.foreachRDD { rdd => 
    val records = rdd.toDF("value").select(from_json($"value", schema)).as[Record] 
    processPerType.foreach{case (tpe, process) => 
     val target = records.filter(entry => entry.`type` == tpe) 
     process(target) 
    } 
} 

的問題沒有明確規定需要什麼樣的邏輯被應用到每種類型的記錄。這裏給出的是一種解決問題的通用方法,其中任何定製邏輯可以表示爲函數Dataset[Record] => Unit

如果邏輯可以表示爲一個聚合,可能Dataset聚合函數將更合適。

+0

我必須將數據存儲在配置單元中。在配置單元中創建了15個不同的表。更新的問題。實際上,單一類型的JSON中有超過50列。所以我必須創建15個案例類。有沒有其他的,而不是創建案例類? –

+0

@KishoreKumarSuthar在數據與'case class'(按照Spark術語)'結構化'之後,您可以對數據進行投影以匹配特定表格(val tableProjection1 = records select($「column」,$ 「列」,...)其中($「type」=== ...)' – maasg