我有一個從Kafka主題中提取消息的系統,當它由於某些外部資源不可用而無法處理消息時,它會關閉使用者,將消息返回給主題並等待有一段時間再次啓動消費者。唯一的問題是,關閉不起作用。下面是我在日誌中看到:如何關閉Kafka ConsumerConnector
2014年9月30日08:24:10,918 - com.example.kafka.KafkaConsumer [信息] - [應用akka.actor.workflow上下文-8]關閉關閉消費者關閉 2014-09-30 08:24:10,927 - clients.kafka.ProblemReportObserver [info] - [application-akka.actor.workflow-context-8]消費者關閉 2014-09 -30 08:24:11,946 - clients.kafka.ProblemReportObserver [warn] - [application-akka.actor.workflow-context-8]發送7410-1412090624000返回隊列 2014-09-30 08:24:12,021 - clients.kafka.ProblemReportObserver [debug] - [kafka-akka.actor.kafka-consumer-worker-context-9]來自分區0的消息:key = 7410-1412090624000,msg = 7410-1412090624000
有工作幾層這裏,但重要的代碼是:
在KafkaConsumer.scala
:
protected def consumer: ConsumerConnector = Consumer.create(config.asKafkaConfig)
def shutdown() = {
logger.info(s"Shutting down kafka consumer for topic ${config.topic}")
consumer.shutdown()
}
在這種觀察的消息常規:
(processor ? ProblemReportRequest(problemReportKey)).map {
case e: ConnectivityInterruption =>
val backoff = 10.seconds
logger.warn(s"Can't connect to essential services, pausing for $backoff", e)
stop()
// XXX: Shutdown isn't instantaneous, so returning has to happen after a delay.
// Unfortunately, there's still a race condition here, plus there's a chance the
// system will be shut down before the message has been returned.
system.scheduler.scheduleOnce(100 millis) { returnMessage(message) }
system.scheduler.scheduleOnce(backoff) { start() }
false
case e: Exception => returnMessage(message, e)
case _ => true
}.recover { case e => returnMessage(message, e) }
和停止方法:
def stop() = {
if (consumerRunning.get()) {
consumer.shutdown()
consumerRunning.compareAndSet(true, false)
logger.info("Consumer shutdown")
} else {
logger.info("Consumer is already shutdown")
}
!consumerRunning.get()
}
這是一個錯誤,還是我做錯了?