2017-08-28 165 views
2

我正在寫一個Spark(v2.2)批處理作業,它從Kafka主題讀取。 Spark作業使用cron進行調度。 我不能使用Spark Structured Streaming,因爲不支持非基礎時間窗口。使用Apache Spark批處理Apache Kafka的偏移管理

val df = spark 
     .read 
     .format("kafka") 
     .option("kafka.bootstrap.servers", "...") 
     .option("subscribe", s"kafka_topic") 

我需要設置的偏移量卡夫卡話題從哪裏開始下一個批處理作業就知道了。我怎樣才能做到這一點?

回答

1

我想你正在使用KafkaUtils來創建流,你可以傳遞這個參數。

val inputDStream = KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent, 
          Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets)) 

希望這有助於!

+0

反對 接受 對於批量查詢,不允許使用最新的偏移量。 https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html 我必須傳遞給Spark Streaming才能從最後一次查詢停止的位置恢復新的查詢。 – ngi

相關問題