2017-12-18 123 views
0

Spark和Scala的新手。試圖達到以下。我的消息看起來像以下(鑰匙,ID,版本,dataObject時)星火羣組rdd由配對RDD上的鑰匙和羣組組成,並從每個羣組中挑選最新的

val transformedRDD = processedMessages.flatMap(message => { 
    message.isProcessed match { 
     case true => Some(message.key, message.id, message.version, message) 
     case false => None 
    } 
    }).groupByKey 

我想組由ID對每個消息並獲得最新版本的消息,然後groupbykey,然後調用它看起來像下面

預定方法
Ingest(key,RDD[dataObject]) 
+0

這並不回答你的問題,但也許會幫助你選擇正確的模塊,以滿足您的需求。爲什麼你使用Spark Streaming?如果你是_「新的spark和scala。」_ ?! –

+0

我在問題本身中沒有看到任何Spark Stream參考。 –

回答

0

在大多數情況下,您應該避免groupByKey,因爲它可能導致重新洗牌,這可能非常昂貴。在您的使用案例中,您不需要groupByKey,而是可以使用reduceByKey

val transformedRDD = processedMessages 
    // notice that we will have Rdd[(String, Message)] or PairRdd after this flatMap 
    .flatMap(message => message.isProcessed match { 
    case true => Some((message.id, message)) 
    case false => None 
    }) 
    // after this reduction we will have latest message for each id 
    .reduceByKey((m1: Message, m2: Message) => m1.version >= m2.version match { 
    case true => m1 
    case false => m2 
    }) 
    // now we just want to keep message 
    .map({ case (id, message) => message }) 
+0

感謝您的信息...我必須做一步,除了找到每個id的最新版本,我必須按上面提到的鍵來分組,以收集所有rdds去單個表 – user2526641

+0

Rdd去單個表嗎? ??那是什麼意思?哪把鑰匙? –

+0

對不起。我知道如何寫。非常感謝 – user2526641