2017-09-27 106 views
2

我正在運行一個Akka Streams Reactive Kafka應用程序,它應該在重負載下正常工作。運行該應用程序大約10分鐘後,該應用程序將以OutOfMemoryError停機。我試圖調試堆轉儲,發現akka.dispatch.Dispatcher正在佔用〜5GB的內存。以下是我的配置文件。Akka Streams反應性Kafka - OutOfMemoryError在高負載下

阿卡版本:2.4.18

反應卡夫卡版本:2.4.18

1. application.conf

consumer { 
num-consumers = "2" 
c1 { 
    bootstrap-servers = "localhost:9092" 
    bootstrap-servers=${?KAFKA_CONSUMER_ENDPOINT1} 
    groupId = "testakkagroup1" 
    subscription-topic = "test" 
    subscription-topic=${?SUBSCRIPTION_TOPIC1} 
    message-type = "UserEventMessage" 
    poll-interval = 100ms 
    poll-timeout = 50ms 
    stop-timeout = 30s 
    close-timeout = 20s 
    commit-timeout = 15s 
    wakeup-timeout = 10s 
    use-dispatcher = "akka.kafka.default-dispatcher" 
    kafka-clients { 
    enable.auto.commit = true 
    } 
} 

2. build.sbt

java -Xmx6g \ 
-Dcom.sun.management.jmxremote.port=27019 \ 
-Dcom.sun.management.jmxremote.authenticate=false \ 
-Dcom.sun.management.jmxremote.ssl=false \ 
-Djava.rmi.server.hostname=localhost \ 
-Dzookeeper.host=$ZK_HOST \ 
-Dzookeeper.port=$ZK_PORT \ 
-jar ./target/scala-2.11/test-assembly-1.0.jar 

3. SourceSink演員:

class EventStream extends Actor with ActorLogging { 

    implicit val actorSystem = context.system 
    implicit val timeout: Timeout = Timeout(10 seconds) 
    implicit val materializer = ActorMaterializer() 
    val settings = Settings(actorSystem).KafkaConsumers 

    override def receive: Receive = { 
    case StartUserEvent(id) => 
     startStreamConsumer(consumerConfig("EventMessage"+".c"+id)) 
    } 

    def startStreamConsumer(config: Map[String, String]) = { 
    val consumerSource = createConsumerSource(config) 

    val consumerSink = createConsumerSink() 

    val messageProcessor = startMessageProcessor(actorA, actorB, actorC) 

    log.info("Starting The UserEventStream processing") 

    val future = consumerSource.map { message => 
     val m = s"${message.record.value()}" 
     messageProcessor ? m 
    }.runWith(consumerSink) 

    future.onComplete { 
     case _ => actorSystem.stop(messageProcessor) 
    } 
    } 

    def startMessageProcessor(actorA: ActorRef, actorB: ActorRef, actorC: ActorRef) = { 
    actorSystem.actorOf(Props(classOf[MessageProcessor], actorA, actorB, actorC)) 
    } 

    def createConsumerSource(config: Map[String, String]) = { 
    val kafkaMBAddress = config("bootstrap-servers") 
    val groupID = config("groupId") 
    val topicSubscription = config("subscription-topic").split(',').toList 
    println(s"Subscriptiontopics $topicSubscription") 

    val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer) 
     .withBootstrapServers(kafkaMBAddress) 
     .withGroupId(groupID) 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
     .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true") 

    Consumer.committableSource(consumerSettings, Subscriptions.topics(topicSubscription:_*)) 
    } 

    def createConsumerSink() = { 
    Sink.foreach(println) 
    } 
}  

在這種情況下actorAactorBactorC正在做一些業務邏輯處理與數據庫交互。在處理Akka Reactive Kafka消費者(如提交,錯誤或限制配置)時是否有任何缺失?因爲查看堆轉儲,我可以猜測消息堆積如山。

回答

5

有一件事我會改變如下:

val future = consumerSource.map { message => 
    val m = s"${message.record.value()}" 
    messageProcessor ? m 
}.runWith(consumerSink) 

在上面的代碼,您正在使用ask將消息發送到messageProcessor演員和期待答覆,但爲了ask發揮作用作爲背壓機制,您需要與mapAsync一起使用(更多信息請見documentation)。類似如下:

val future = 
    consumerSource 
    .mapAsync(parallelism = 5) { message => 
     val m = s"${message.record.value()}" 
     messageProcessor ? m 
    } 
    .runWith(consumerSink) 

根據需要調整並行度。

+0

感謝您的解決方案。保存了我的一天。 – Deepakkumar