我正在嘗試使用Spark Streaming來使用Kafka主題的特定分區。Spark Streaming - 是否可以使用卡夫卡主題的特定分區?
我在KafkaUtils類中看不到此用例的任何方法。
有一種方法稱爲createRDD
,基本上預計offsets
,它只適用於非流式應用程序。是否有任何其他方式可以使用Spark Streaming消費特定的Kafka主題分區?
我正在嘗試使用Spark Streaming來使用Kafka主題的特定分區。Spark Streaming - 是否可以使用卡夫卡主題的特定分區?
我在KafkaUtils類中看不到此用例的任何方法。
有一種方法稱爲createRDD
,基本上預計offsets
,它只適用於非流式應用程序。是否有任何其他方式可以使用Spark Streaming消費特定的Kafka主題分區?
沒有辦法消耗單個分區,我們可以使用的最細粒度的是一個主題。但是,有一種方法可以指定說給定的消息來自特定的分區。您可以在使用createDirectStream
的過載時執行此操作,該過載需要Function1[MessageAndMetadata, R]
。
例如,我們假設我們有一個類型爲String
的鍵和消息,並且我們當前只使用單個主題。我們可以這樣做:
val topicAndPartition: Map[TopicAndPartition, Long] = ???
val kafkaProperties: Map[String, String] = ???
KafkaUtils.createDirectStream[String,
String,
StringDecoder,
StringDecoder,
(String, String)](
streamingContext,
kafkaConfig.properties,
topicAndPartition,
(mam: MessageAndMetadata[String, String]) =>
(mam.partition, mam.message())
這樣,我輸出分區(1)的元組和基礎消息(2)。然後,我可以過濾這個DStream[(String, String)]
從一個特定的分區只包含消息:
val filteredStream = kafkaDStream.filter { case (partition, _) => partition == 4 }
如果我們從多個主題消費,我們需要輸出兩個主題,分區的元組,以過濾分區與正確的話題。幸運的是,我們可以使用一個名爲TopicAndPartition
的便利案例類。我們希望有:
(mam: MessageAndMetadata[String, String]) =>
(TopicAndPartition(mam.topic(), mam.partition()), mam.message())
然後:
val filteredStream = kafkaDStream.filter {
case (tap, _) => tap.topic == "mytopic" && tap.partition == 4
}