2017-03-15 13 views
0

我想組每個主題或應用發來的郵件,其主題知道:的MessageHandler在KafkaUtils010 SparkStreaming

val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](
     Array(topicConfig.srcTopic), 
     kafkaParameters(BOOTSTRAP_SERVERS,"kafka_test_group_id)) 
    ) 
) 
在最新的API kafka010

但是似乎不支持的消息處理程序,在以前的版本。有關如何獲得該主題的任何想法?

我的目標是從N個話題消耗處理它們(中視主題不同的方式),然後將其推回給另外N話題的1:主題1映射:

SrcTopicA--> Process --> DstTopicA 
SrcTopicB--> Process --> DstTopicB 
SrcTopicC--> Process --> DstTopicC 

但有一些需要共享的屬性(變化很大,所以不可能使用廣播變量)。因此,所有主題都需要在同一個火花作業中消耗。當您在使用0.10 createDirectStream

stream.filter(cr => cr.topic) 

回答

0

你得到一個ConsumerRecord

0

可以過濾使用topic這樣流。該記錄具有topic值。您可以創建主題和值的元組:

val stream: InputDStream[ConsumerRecord[String, String]] = 
    KafkaUtils.createDirectStream[String, String](
    streamingContext, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
) 

val res: DStream[(String, String)] = stream.map(record => (record.topic(), record.value())) 
相關問題