0

我的火花流的工作是從卡夫卡卡夫卡,因爲它需要大量的時間消耗只有新郵件

KafkaUtils.createStream(jssc, prop.getProperty(Config.ZOOKEEPER_QUORUM), 
         prop.getProperty(Config.KAFKA_CONSUMER_GROUP), topicMap); 

消耗數據,每當我重新啓動我的工作就開始消耗從去年偏移店(我假定這發送處理過的數據,如果我更改消費者組,它會立即使用新消息)

我是kafka 8.1.1其中auto.offset.reset默認爲最大,這意味着每當我重新啓動kafka將發送數據從哪裏我離開了。

我的使用案例要求我忽略這些數據並僅處理到達的數據。我怎樣才能做到這一點? 任何建議

回答

2

有兩種方法可以實現這一目標:

  1. 創建一個獨特的消費羣體在重新啓動每次,它會從最新的偏移消耗。

  2. 使用直接方法而不是基於接收器;在這裏你可以更好地控制你如何消費,但必須手動更新zookeeper來存儲你的偏移量。在下面的例子中,它將始終以最新的偏移量開始。

    import org.apache.spark.streaming.kafka._ 
    val topicsSet = topics.split(",").toSet 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 
    

直接的方式文檔在這裏:https://spark.apache.org/docs/latest/streaming-kafka-integration.html

+0

我發現多了一個辦法星火1.5(我測試),使用卡夫卡直接的API和不使用檢查點。 –