2017-07-07 27 views
1

關於我的火花流程程序流程的小問題。我的火花流程程序的流程

我有這樣的功能:

​​

裏面居然拆一個「好」消息分爲多個字符串,並且,如果字符串是「壞」,返回一個空序列。

我讀從卡夫卡主題的消息,我想解析的結果發送到兩個不同的主題: 如果消息是「好」,發送解析主題「good_msg_topic的結果「 如果消息是‘壞’,發送‘壞’消息主題‘bad_msg_topic’

爲了實現這個目標,我這樣做:

stream.foreachRDD(rdd => { 
    val res = rdd.map(msg => msg.value() -> parse(msg.value())) 

    res.foreach(pair => { 
    if (pair._2.isEmpty) { 
     producer.send(junkTopic, pair._1) 
    } else { 
     pair._2.foreach(m => producer.send(splitTopic, m)) 
    } 
    }) 
}) 

不過,我覺得這是不是最佳。使用映射對象將原始消息關聯到結果可能會減慢過程...

我以Spark和Scala開始,所以我認爲可以做得更好。

關於如何改善這一點的任何想法?如果您認爲它更好,也可以更改解析函數的簽名。

謝謝

+1

*使用映射對象將原始消息關聯到結果可能會減慢進程*。你究竟在擔心什麼?你衡量了性能,發現這是一個瓶頸? –

回答

3

如果您尚未測量,發現這個瓶頸,我也不會太在意有關性能。

有一件事我能想到的可能使代碼更清晰的是使用ADT來形容結果類型:

sealed trait Result 
case class GoodResult(seq: Seq[String]) extends Result 
case class BadResult(original: String) extends Result 

parseResult

def parse(s: String): Result 

然後用mapDStream而不是RDD

stream 
.map(msg => parse(msg.value()) 
.foreachRDD { rdd => 
    rdd.foreach { result => 
    result match { 
     case GoodResult(seq) => seq.foreach(value => producer.send(splitTopic, value)) 
     case BadResult(original) => producer.send(junkTopic, original) 
    } 
    } 
}