我從火花流中獲取數百萬來自卡夫卡流的消息。有15種不同類型的消息。消息來自單個主題。我只能通過其內容區分消息。所以我使用rdd.contains方法來獲取不同類型的rdd。是火花斯卡拉rdd.contains功能昂貴
示例消息
{ 「一個」: 「foo」 的, 「B」: 「棒」, 「類型」: 「第一」 .......}
{ 「一個」:」 {「a」:「foo2」,「b」:「bar2」,「type」:「foo1」,「b」:「bar1」,「type」:「second」.......
「第三個「.......}
{」a「:」foo「,」b「:」bar「,」type「:」first「.......}
.... ..........
...............
.........
等
代碼
DStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val rdd_first = rdd.filter {
ele => ele.contains("First")
}
if (!rdd_first.isEmpty()) {
insertIntoTableFirst(hivecontext.read.json(rdd_first))
}
val rdd_second = rdd.filter {
ele => ele.contains("Second")
}
if (!rdd_second.isEmpty()) {
insertIntoTableSecond(hivecontext.read.json(rdd_second))
}
.............
......
same way for 15 different rdd
有沒有什麼辦法讓卡夫卡主題消息不同RDD?
我必須將數據存儲在配置單元中。在配置單元中創建了15個不同的表。更新的問題。實際上,單一類型的JSON中有超過50列。所以我必須創建15個案例類。有沒有其他的,而不是創建案例類? –
@KishoreKumarSuthar在數據與'case class'(按照Spark術語)'結構化'之後,您可以對數據進行投影以匹配特定表格(val tableProjection1 = records select($「column」,$ 「列」,...)其中($「type」=== ...)' – maasg