2016-07-01 32 views
1

提取我有下面這段代碼:電梯JSON的從JSON對象

object Test { 
    def main(args: Array[String]) { 
      val sparkConf = new SparkConf().setAppName("Spark").setMaster("local[2]") 
      val sc = new SparkContext(sparkConf) 

      val ssc = new StreamingContext(sc, Seconds(3)) 
      val kafkaBrokers = Map("metadata.broker.list" -> "HostName:9092") 
      val offsetMap = Map(TopicAndPartition("topic_test", 0), 8) 
      val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaBrokers, offsetMap) 

var offsetArray = Array[OffsetRange]() 
       lines.transform {rdd => 
         offsetArray = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
         rdd 
       }.map { 
         _.message() 
       }.foreachRDD {rdd => 
        /* NEW CODE */ 
       } 
       ssc.start() 
       ssc.awaitTermination() 
    } 
} 

我已經添加了新的代碼烏德註釋/* NEW CODE */。我的問題是lines val將包含一系列RDD,它們每隔3秒基本形成一次kafka服務器。然後我使用地圖功能來抓取消息。

但我對foreachRDD函數的功能有點困惑。這是否遍歷所有RDD's這是lines DStream(這正是我想要做的)?事情是從lift-json庫解析函數只接受一個字符串,所以我需要迭代所有的rdd和傳遞該字符串值的解析函數,這是我試圖做的。 但是由於某種原因沒有任何東西正在打印出來。

回答

1

如果您想讀取特定偏移量的數據,那麼您使用的是過載錯誤。

。你需要的是這樣的:

createDirectStream[K, 
        V, 
        KD <: Decoder[K], 
        VD <: Decoder[V], R] 
        (ssc: StreamingContext, 
        kafkaParams: Map[String, String], 
        fromOffsets: Map[TopicAndPartition, Long], 
        messageHandler: (MessageAndMetadata[K, V]) ⇒ R): InputDStream[R] 

你需要一個Map[TopicAndPartition, Long]

val offsetMap = Map(TopicAndPartition("topic_test", 0), 8L) 

而且你需要傳遞,接收MessageAndMetadata[K, V],並返回你所需的類型,例如功能:

val extractKeyValue: MessageAndMetadata[String, String] => (String, String) = 
     msgAndMeta => (msgAndMeta.key(), msgAndMeta.message()) 

並使用它:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] 
(ssc, kafkaBrokers, offsetMap, extractKeyValue) 
+0

Omg非常感謝你!只是一個問題。在這一行中:'val extractKeyValue:MessageAndMetadata [String,String] =>(String,String)= msgAndMeta =>(msgAndMeta.key(),msgAndMeta.message())'msgAndMeta.key() msgAndMeta.message()'返回像元組?它是「(TOPIC_NAME,MESSAGE)」嗎? – CapturedTree

+1

@ 1290當您向Kafka插入記錄時,您可以指定一個鍵和一個值。所以'key()'是關鍵字,'message()'是值。這個方法爲它們創建一個「Tuple2」。 –

+0

哦,好吧,我明白了。您能否回答我關於此流式作業工作流程的另一個問題。因此,本質上我的火花流上下文將來自我在我的'kafkaParams'映射中指定的kafka代理的數據,從'0'分區開始,'offsetMap'中指定的'8th'偏移量。所以現在我的問題是:每3秒鐘它會運行那個代碼我添加到我的問題下的新代碼註釋請參閱這裏? – CapturedTree