2014-09-30 73 views
1

我有一個從Kafka主題中提取消息的系統,當它由於某些外部資源不可用而無法處理消息時,它會關閉使用者,將消息返回給主題並等待有一段時間再次啓動消費者。唯一的問題是,關閉不起作用。下面是我在日誌中看到:如何關閉Kafka ConsumerConnector

2014年9月30日08:24:10,918 - com.example.kafka.KafkaConsumer [信息] - [應用akka.actor.workflow上下文-8]關閉關閉消費者關閉 2014-09-30 08:24:10,927 - clients.kafka.ProblemReportObserver [info] - [application-akka.actor.workflow-context-8]消費者關閉 2014-09 -30 08:24:11,946 - clients.kafka.ProblemReportObserver [warn] - [application-akka.actor.workflow-context-8]發送7410-1412090624000返回隊列 2014-09-30 08:24:12,021 - clients.kafka.ProblemReportObserver [debug] - [kafka-akka.actor.kafka-consumer-worker-context-9]來自分區0的消息:key = 7410-1412090624000,msg = 7410-1412090624000

有工作幾層這裏,但重要的代碼是:

KafkaConsumer.scala

protected def consumer: ConsumerConnector = Consumer.create(config.asKafkaConfig) 
def shutdown() = { 
    logger.info(s"Shutting down kafka consumer for topic ${config.topic}") 
    consumer.shutdown() 
} 

在這種觀察的消息常規:

(processor ? ProblemReportRequest(problemReportKey)).map { 
    case e: ConnectivityInterruption => 
    val backoff = 10.seconds 
    logger.warn(s"Can't connect to essential services, pausing for $backoff", e) 
    stop() 
    // XXX: Shutdown isn't instantaneous, so returning has to happen after a delay. 
    // Unfortunately, there's still a race condition here, plus there's a chance the 
    // system will be shut down before the message has been returned. 
    system.scheduler.scheduleOnce(100 millis) { returnMessage(message) } 
    system.scheduler.scheduleOnce(backoff) { start() } 
    false 
    case e: Exception => returnMessage(message, e) 
    case _ => true 
}.recover { case e => returnMessage(message, e) } 

和停止方法:

def stop() = { 
    if (consumerRunning.get()) { 
    consumer.shutdown() 
    consumerRunning.compareAndSet(true, false) 
    logger.info("Consumer shutdown") 
    } else { 
    logger.info("Consumer is already shutdown") 
    } 
    !consumerRunning.get() 
} 

這是一個錯誤,還是我做錯了?

回答

1

因爲您的consumerdef。它會創建一個新的Kafka實例,並在您稱其爲consumer.shutdown()時關閉該新實例。改爲使用consumer a val