2015-05-20 109 views
5

如何從kafka中的消息中識別主題名稱。從kafka信息獲取主題

String[] topics = { "test", "test1", "test2" }; 
    for (String t : topics) { 
     topicMap.put(t, new Integer(3)); 
    } 

SparkConf conf = new SparkConf().setAppName("KafkaReceiver") 
      .set("spark.streaming.receiver.writeAheadLog.enable", "false") 
      .setMaster("local[4]") 
      .set("spark.cassandra.connection.host", "localhost"); 
    ; 
    final JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(
      1000)); 

    /* Receive Kafka streaming inputs */ 
    JavaPairReceiverInputDStream<String, String> messages = KafkaUtils 
      .createStream(jssc, "localhost:2181", "test-group", 
        topicMap); 

    JavaDStream<MessageAndMetadata> data = 
      messages.map(new Function<Tuple2<String, String>, MessageAndMetadata>() 
      { 

       public MessageAndMetadata call(Tuple2<String, String> message) 
       { 
        System.out.println("message ="+message._2); 
        return null; 
       } 
      } 

     ); 

我可以從kafka製作人處取得訊息。但是,由於消費者現在消費了三個主題,因此需要確定主題名稱。

+0

我對這個答案很感興趣。你找到方法了嗎? –

+0

@阿倫:你找到解決方案嗎?如果是這樣,你能分享它嗎?謝謝! – jithinpt

回答

0

不幸的是,這並不簡單,因爲Spark的源代碼中的KafkaReceiver和ReliableKafkaReceiver只存儲MessageAndMetadata.key和消息。

有兩個Spark中的JIRA與這個問題有關開放門票:已經打開了,而

髒的複製/粘貼/修改的斯巴克的源代碼,以解決您的問題:

package org.apache.spark.streaming.kafka 

import java.lang.{Integer => JInt} 
import java.util.{Map => JMap, Properties} 

import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector} 
import kafka.serializer.{Decoder, StringDecoder} 
import kafka.utils.VerifiableProperties 
import org.apache.spark.Logging 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} 
import org.apache.spark.streaming.dstream.ReceiverInputDStream 
import org.apache.spark.streaming.receiver.Receiver 
import org.apache.spark.streaming.util.WriteAheadLogUtils 
import org.apache.spark.util.ThreadUtils 
import scala.collection.JavaConverters._ 
import scala.collection.Map 
import scala.reflect._ 

object MoreKafkaUtils { 

    def createStream(
    jssc: JavaStreamingContext, 
    zkQuorum: String, 
    groupId: String, 
    topics: JMap[String, JInt], 
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 
): JavaReceiverInputDStream[(String, String, String)] = { 
    val kafkaParams = Map[String, String](
     "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, 
     "zookeeper.connection.timeout.ms" -> "10000") 
    val walEnabled = WriteAheadLogUtils.enableReceiverLog(jssc.ssc.conf) 
    new KafkaInputDStreamWithTopic[String, String, StringDecoder, StringDecoder](jssc.ssc, kafkaParams, topics.asScala.mapValues(_.intValue()), walEnabled, storageLevel) 
    } 

} 

private[streaming] 
class KafkaInputDStreamWithTopic[ 
    K: ClassTag, 
    V: ClassTag, 
    U <: Decoder[_] : ClassTag, 
    T <: Decoder[_] : ClassTag](
    @transient ssc_ : StreamingContext, 
    kafkaParams: Map[String, String], 
    topics: Map[String, Int], 
    useReliableReceiver: Boolean, 
    storageLevel: StorageLevel 
) extends ReceiverInputDStream[(K, V, String)](ssc_) with Logging { 

    def getReceiver(): Receiver[(K, V, String)] = { 
    if (!useReliableReceiver) { 
     new KafkaReceiverWithTopic[K, V, U, T](kafkaParams, topics, storageLevel) 
    } else { 
     new ReliableKafkaReceiverWithTopic[K, V, U, T](kafkaParams, topics, storageLevel) 
    } 
    } 
} 

private[streaming] 
class KafkaReceiverWithTopic[ 
    K: ClassTag, 
    V: ClassTag, 
    U <: Decoder[_] : ClassTag, 
    T <: Decoder[_] : ClassTag](
    kafkaParams: Map[String, String], 
    topics: Map[String, Int], 
    storageLevel: StorageLevel 
) extends Receiver[(K, V, String)](storageLevel) with Logging { 

    // Connection to Kafka 
    var consumerConnector: ConsumerConnector = null 

    def onStop() { 
    if (consumerConnector != null) { 
     consumerConnector.shutdown() 
     consumerConnector = null 
    } 
    } 

    def onStart() { 

    logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) 

    // Kafka connection properties 
    val props = new Properties() 
    kafkaParams.foreach(param => props.put(param._1, param._2)) 

    val zkConnect = kafkaParams("zookeeper.connect") 
    // Create the connection to the cluster 
    logInfo("Connecting to Zookeeper: " + zkConnect) 
    val consumerConfig = new ConsumerConfig(props) 
    consumerConnector = Consumer.create(consumerConfig) 
    logInfo("Connected to " + zkConnect) 

    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
     .newInstance(consumerConfig.props) 
     .asInstanceOf[Decoder[K]] 
    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
     .newInstance(consumerConfig.props) 
     .asInstanceOf[Decoder[V]] 

    // Create threads for each topic/message Stream we are listening 
    val topicMessageStreams = consumerConnector.createMessageStreams(
     topics, keyDecoder, valueDecoder) 

    val executorPool = 
     ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler") 
    try { 
     // Start the messages handler for each partition 
     topicMessageStreams.values.foreach { streams => 
     streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } 
     } 
    } finally { 
     executorPool.shutdown() // Just causes threads to terminate after work is done 
    } 
    } 

    // Handles Kafka messages 
    private class MessageHandler(stream: KafkaStream[K, V]) 
    extends Runnable { 
    def run() { 
     logInfo("Starting MessageHandler.") 
     try { 
     val streamIterator = stream.iterator() 
     while (streamIterator.hasNext()) { 
      val msgAndMetadata = streamIterator.next() 
      store((msgAndMetadata.key, msgAndMetadata.message, msgAndMetadata.topic)) 
     } 
     } catch { 
     case e: Throwable => reportError("Error handling message; exiting", e) 
     } 
    } 
    } 

} 
+0

您也可以嘗試使用帶有messageHandler的實驗性KafkaUtils.createDirectStream:JFunction [MessageAndMetadata [K,V],R]作爲參數。 –

1

火花1.5.0,official documentation鼓勵使用從最近發佈的開始沒有接收器/直接的方法,其中有從最近的1.5.0實驗畢業。 這個新的Direct API允許您輕鬆地獲取消息及其元數據,而不是其他好東西。

+0

我正在使用直接方法,並不理解如何獲取消息元數據。你能詳細說明嗎? –

+0

@BrandonBradley,請按照上面的鏈接查看官方文檔中的最後一個代碼片段。基本上,你一旦得到它就必須將RDD投射到HasOffsetRanges。 –