2016-10-13 56 views

回答

2

沒有辦法消耗單個分區,我們可以使用的最細粒度的是一個主題。但是,有一種方法可以指定說給定的消息來自特定的分區。您可以在使用createDirectStream的過載時執行此操作,該過載需要Function1[MessageAndMetadata, R]

例如,我們假設我們有一個類型爲String的鍵和消息,並且我們當前只使用單個主題。我們可以這樣做:

val topicAndPartition: Map[TopicAndPartition, Long] = ??? 
val kafkaProperties: Map[String, String] = ??? 

KafkaUtils.createDirectStream[String, 
           String, 
           StringDecoder, 
           StringDecoder, 
           (String, String)](
     streamingContext, 
     kafkaConfig.properties, 
     topicAndPartition, 
     (mam: MessageAndMetadata[String, String]) => 
      (mam.partition, mam.message()) 

這樣,我輸出分區(1)的元組和基礎消息(2)。然後,我可以過濾這個DStream[(String, String)]從一個特定的分區只包含消息:

val filteredStream = kafkaDStream.filter { case (partition, _) => partition == 4 } 

如果我們從多個主題消費,我們需要輸出兩個主題,分區的元組,以過濾分區與正確的話題。幸運的是,我們可以使用一個名爲TopicAndPartition的便利案例類。我們希望有:

(mam: MessageAndMetadata[String, String]) => 
    (TopicAndPartition(mam.topic(), mam.partition()), mam.message()) 

然後:

val filteredStream = kafkaDStream.filter { 
    case (tap, _) => tap.topic == "mytopic" && tap.partition == 4 
}