我想組每個主題或應用發來的郵件,其主題知道:的MessageHandler在KafkaUtils010 SparkStreaming
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](
Array(topicConfig.srcTopic),
kafkaParameters(BOOTSTRAP_SERVERS,"kafka_test_group_id))
)
)
在最新的API kafka010
但是似乎不支持的消息處理程序,在以前的版本。有關如何獲得該主題的任何想法?
我的目標是從N個話題消耗處理它們(中視主題不同的方式),然後將其推回給另外N話題的1:主題1映射:
SrcTopicA--> Process --> DstTopicA
SrcTopicB--> Process --> DstTopicB
SrcTopicC--> Process --> DstTopicC
但有一些需要共享的屬性(變化很大,所以不可能使用廣播變量)。因此,所有主題都需要在同一個火花作業中消耗。當您在使用0.10 createDirectStream
stream.filter(cr => cr.topic)