2014-02-26 49 views
8

我想運行Kafka-0.8 Log4j appender,但我無法完成。 我希望我的應用程序通過Log4j appender直接發送日誌到kafka。如何使用Kafka 0.8 Log4j appender

這是我的log4j.properties。 我找不到任何合適的編碼器,所以我只是將它配置爲使用默認編碼器。 (例如,我評論了該行)。

log4j.rootLogger=INFO, stdout, KAFKA 

log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n 

log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender 
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout 
log4j.appender.KAFKA.layout.ConversionPattern=%-5p: %c - %m%n 
log4j.appender.KAFKA.BrokerList=hnode01:9092 
log4j.appender.KAFKA.Topic=DKTestEvent 

#log4j.appender.KAFKA.SerializerClass=kafka.log4j.AppenderStringEncoder 

這是我的示例應用程序。

import org.apache.log4j.Logger; 
import org.apache.log4j.BasicConfigurator; 
import org.apache.log4j.PropertyConfigurator; 

public class HelloWorld { 

     static Logger logger = Logger.getLogger(HelloWorld.class.getName()); 

     public static void main(String[] args) { 
      PropertyConfigurator.configure(args[0]); 

      logger.info("Entering application."); 
      logger.debug("Debugging!."); 
      logger.info("Exiting application."); 
     } 
} 

我用編譯的maven。 我包括kafka_2.8.2-0.8.0和log4j_1.2.17在我的pom.xml

而且我得到這些錯誤:

INFO [main] (Logging.scala:67) - Verifying properties 
INFO [main] (Logging.scala:67) - Property metadata.broker.list is overridden to hnode01:9092 
INFO [main] (Logging.scala:67) - Property serializer.class is overridden to kafka.serializer.StringEncoder 
INFO [main] (HelloWorld.java:14) - Entering application. 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 0 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 1 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 2 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 3 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 4 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 5 for 1 topic(s) Set(DKTestEvent) 
. 
. 
. 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 60 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 61 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 62 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 63 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 64 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 65 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 66 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 67 for 1 topic(s) Set(DKTestEvent) 
. 
. 
. 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 534 for 1 topic(s) Set(DKTestEvent) 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
java.lang.StackOverflowError 
    at java.lang.ClassLoader.defineClass1(Native Method) 
    at java.lang.ClassLoader.defineClass(ClassLoader.java:643) 
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
at java.net.URLClassLoader.defineClass(URLClassLoader.java:277) 
at java.net.URLClassLoader.access$000(URLClassLoader.java:73) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:212) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(URLClassLoader.java:205) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:323) 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:268) 
at java.lang.ClassLoader.defineClass1(Native Method) 
at java.lang.ClassLoader.defineClass(ClassLoader.java:643) 
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
at java.net.URLClassLoader.defineClass(URLClassLoader.java:277) 
at java.net.URLClassLoader.access$000(URLClassLoader.java:73) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:212) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(URLClassLoader.java:205) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:323) 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:268) 
at org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87) 
at org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413) 
at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313) 
at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) 
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) 
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) 
at org.apache.log4j.Category.callAppenders(Category.java:206) 
at org.apache.log4j.Category.forcedLog(Category.java:391) 
at org.apache.log4j.Category.error(Category.java:322) 
at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105) 
at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105) 
at kafka.utils.Utils$.swallow(Utils.scala:189) 
at kafka.utils.Logging$class.swallowError(Logging.scala:105) 
at kafka.utils.Utils$.swallowError(Utils.scala:46) 
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) 
at kafka.producer.Producer.send(Producer.scala:76) 
at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96) 
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) 
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) 
at org.apache.log4j.Category.callAppenders(Category.java:206) 
at org.apache.log4j.Category.forcedLog(Category.java:391) 
at org.apache.log4j.Category.info(Category.java:666) 
at kafka.utils.Logging$class.info(Logging.scala:67) 
at kafka.client.ClientUtils$.info(ClientUtils.scala:31) 
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51) 
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) 
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) 
at kafka.utils.Utils$.swallow(Utils.scala:187) 
at kafka.utils.Logging$class.swallowError(Logging.scala:105) 
at kafka.utils.Utils$.swallowError(Utils.scala:46) 
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) 
at kafka.producer.Producer.send(Producer.scala:76) 
at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96) 
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) 
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) 
. 
. 
. 

我收到上述錯誤不斷,如果我不`噸終止程序。

如果我錯過了什麼,請讓我知道。

回答

1

嘗試設置附加器異步,就像這樣: log4j.appender.KAFKA.ProducerType =異步

似乎是合理的,它去到一個無限循環,因爲卡夫卡商已經在自己登錄..

5

我認爲喬納斯已經發現了這個問題,即卡夫卡生產者日誌記錄也被記錄到卡夫卡appender,導致無限循環和最終堆棧溢出(無雙關語意圖) 您可以配置所有Kafka日誌以轉到不同的appender 。下面展示瞭如何將輸出到stdout:

log4j.logger.kafka=INFO, stdout 

所以,你應該在你的log4j.properties

log4j.rootLogger=INFO, stdout, KAFKA 
log4j.logger.kafka=INFO, stdout 
log4j.logger.HelloWorld=INFO, KAFKA 
2

我已經能夠產生卡夫卡0.8.2.2通過log4j的事件下面的結束。這裏是我的log4j的配置:

<?xml version="1.0" encoding="UTF-8" ?> 
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> 

<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> 

    <appender name="console" class="org.apache.log4j.ConsoleAppender"> 
     <param name="Target" value="System.out" /> 
     <layout class="org.apache.log4j.PatternLayout"> 
     <param name="ConversionPattern" value="%-5p %c{1} - %m%n" /> 
     </layout> 
    </appender> 
    <appender name="fileAppender" class="org.apache.log4j.RollingFileAppender"> 
     <param name="Threshold" value="INFO" /> 
     <param name="MaxBackupIndex" value="100" /> 
     <param name="File" value="/tmp/agna-LogFile.log" /> 
     <layout class="org.apache.log4j.PatternLayout"> 
     <param name="ConversionPattern" value="%d %-5p [%c{1}] %m %n" /> 
     </layout> 
    </appender> 
    <appender name="kafkaAppender" class="kafka.producer.KafkaLog4jAppender"> 
     <param name="Topic" value="kafkatopic" /> 
     <param name="BrokerList" value="localhost:9092" /> 
     <param name="syncSend" value="true" /> 
     <layout class="org.apache.log4j.PatternLayout"> 
     <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% - %m%n" /> 
     </layout> 
    </appender> 
    <logger name="org.apache.kafka"> 
     <level value="error" /> 
     <appender-ref ref="console" /> 
    </logger> 
    <logger name="com.example.kafkaLogger"> 
     <level value="debug" /> 
     <appender-ref ref="kafkaAppender" /> 
    </logger> 
    <root> 
     <priority value="debug" /> 
     <appender-ref ref="console" /> 
    </root> 
</log4j:configuration> 

這是源代碼:

package com.example; 

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import org.json.simple.JSONArray; 
import org.json.simple.JSONObject; 
import java.util.Properties; 
import java.util.concurrent.ExecutionException; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.KafkaProducer; 

import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.common.serialization.StringSerializer; 

public class JsonProducer { 
    static Logger defaultLogger = LoggerFactory.getLogger(JsonProducer.class); 
    static Logger kafkaLogger = LoggerFactory.getLogger("com.example.kafkaLogger"); 

    public static void main(String args[]) { 

     JsonProducer obj = new JsonProducer(); 

     String str = obj.getJsonObjAsString(); 

     // Use the logger 
     kafkaLogger.info(str); 

     try { 
      // Construct and send message 
      obj.constructAndSendMessage(); 
     } catch (InterruptedException e) { 
      defaultLogger.error("Caught interrupted exception " + e); 
     } catch (ExecutionException e) { 
      defaultLogger.error("Caught execution exception " + e); 
     } 
    } 

    private String getJsonObjAsString() { 
     JSONObject obj = new JSONObject(); 
     obj.put("name", "John"); 
     obj.put("age", new Integer(55)); 
     obj.put("address", "123 MainSt, Palatine, IL"); 

     JSONArray list = new JSONArray(); 
     list.add("msg 1"); 
     list.add("msg 2"); 
     list.add("msg 3"); 

     obj.put("messages", list); 

     return obj.toJSONString(); 
    } 

    private void constructAndSendMessage() throws InterruptedException, ExecutionException { 
     Properties props = new Properties(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 

     KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); 

     boolean sync = false; 
     String topic = "kafkatopic"; 
     String key = "mykey"; 
     String value = "myvalue1 mayvalue2 myvalue3"; 
     ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, key, value); 
     if (sync) { 
      producer.send(producerRecord).get(); 
     } else { 
      producer.send(producerRecord); 
     } 
     producer.close(); 
    } 
} 

整個項目是通過下面的鏈接可一:

https://github.com/ypant/kafka-json-producer.git