2017-04-05 150 views
3

我使用Spark Streaming從Kafka主題列表中讀取。 我正在關注此link的官方API。我正在使用的方法是:Spark Streaming + Kafka:如何從kafka消息檢查主題名稱

val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "largest") 
val topics = Set(configuration.getKafkaInputTopic()) 
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics) 

我想知道執行器如何從主題列表中讀取消息?他們的政策是什麼?他們會閱讀一個主題,然後當他們完成消息傳遞給其他主題?

最重要的是,我怎麼能在調用這個方法後,檢查RDD中的消息的主題是什麼?

stream.foreachRDD(rdd => rdd.map(t => { 
     val key = t._1 
     val json = t._2 
     val topic = ??? 
}) 
+0

您可以使用地圖像VAR記錄= stream.map(記錄=>(record.topic)) –

+0

@ israel.zinc我認爲'stream'中的元素是'Tuple2 [String,String]'。有沒有參數或方法稱爲主題 – salvob

回答

0

我想知道如何將消息從主題的 列表讀取執行?他們的政策是什麼?他們會讀取一個主題,然後當他們完成消息傳遞給其他主題?

在直接流式方法中,驅動程序負責讀取要使用的卡夫卡主題的偏移量。它的作用是在主題,分區和需要閱讀的偏移量之間創建一個映射。發生這種情況後,司機會將每個工人分配給範圍以讀入特定的Kafka主題。這意味着,如果一個工作人員可以同時運行2個任務(就這個例子而言,它通常可以運行更多),那麼它可能會同時從兩個不同的卡夫卡主題中讀取。

如何在調用此方法後檢查RDD中的消息 的主題是什麼?

您可以使用createDirectStream超載,這需要MessageHandler[K, V]

val topicsToPartitions: Map[TopicAndPartition, Long] = ??? 

val stream: DStream[(String, String)] = 
    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, 
     kafkaParams, 
     topicsToPartitions, 
     mam: MessageAndMetadata[String, String]) => (mam.topic(), mam.message()) 
+0

謝謝@Yuval,但仍然。如何在閱讀卡夫卡時訪問消息和主題。 'messageHandler'作爲'createDirectStream'的參數,它看起來不能。 – salvob

+0

@salvob我的代碼片段正是如此。輸出將是一個'DStream [(String,String)]',其中第一個是主題名稱。 –

+0

您的代碼定義了一個可能包含每條記錄的消息和主題的流。 但是,當我嘗試打印元組的內容時(在我的問題中帶有'println(key + topic + message)'的代碼片段),沒有任何事情發生,方法'rdd.count()'返回消息的數量正確雖然 – salvob

相關問題