2013-08-27 56 views
7

我通過卡夫卡快速入門工作:Kafka Quickstart:我需要什麼依賴關係?

http://kafka.apache.org/07/quickstart.html

和基本消費羣例如:

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

我已經編寫了消費者和ConsumerThreadPool如上:

import kafka.consumer.KafkaStream; 
import kafka.consumer.ConsumerIterator; 

public class Consumer implements Runnable { 

    private KafkaStream m_stream; 
    private Integer m_threadNumber; 

    public Consumer(KafkaStream a_stream, Integer a_threadNumber) { 
     m_threadNumber = a_threadNumber; 
     m_stream = a_stream; 
    } 

    public void run() { 
     ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
     while (it.hasNext()) { 
      System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); 

     } 
     System.out.println("Shutting down Thread: " + m_threadNumber); 
    } 
} 

其他方面:我是usi ng spring來管理我的動物園管理員:

import javax.inject.Named; 
import java.util.Properties; 
import kafka.consumer.ConsumerConfig; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.ComponentScan; 
import org.springframework.context.annotation.Configuration; 

@Configuration 
@ComponentScan("com.truecar.inventory.worker.core") 
public class AppConfig { 

    @Bean 
    @Named("consumerConfig") 
    private static ConsumerConfig createConsumerConfig() { 
     String zookeeperAddress = "127.0.0.1:2181"; 
     String groupId = "inventory"; 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", zookeeperAddress); 
     props.put("group.id", groupId); 
     props.put("zookeeper.session.timeout.ms", "400"); 
     props.put("zookeeper.sync.time.ms", "200"); 
     props.put("auto.commit.interval.ms", "1000"); 
     return new ConsumerConfig(props); 
    } 
} 

我正在編譯Maven和OneJar Maven插件。但是,我編譯並運行所產生的一個罐子,我得到以下錯誤:

Aug 26, 2013 6:15:41 PM org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider registerDefaultFilters 
INFO: JSR-330 'javax.inject.Named' annotation found and supported for component scanning 
Exception in thread "main" java.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at com.simontuffs.onejar.Boot.run(Boot.java:340) 
at com.simontuffs.onejar.Boot.main(Boot.java:166) 
Caused by: java.lang.NoClassDefFoundError: scala/ScalaObject 
at java.lang.ClassLoader.defineClass1(Native Method) 
at java.lang.ClassLoader.defineClass(ClassLoader.java:792) 
at com.simontuffs.onejar.JarClassLoader.defineClass(JarClassLoader.java:803) 
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:710) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
at java.lang.Class.getDeclaredMethods0(Native Method) 
at java.lang.Class.privateGetDeclaredMethods(Class.java:2521) 
at java.lang.Class.getDeclaredMethods(Class.java:1845) 
at org.springframework.core.type.StandardAnnotationMetadata.getAnnotatedMethods(StandardAnnotationMetadata.java:180) 
at org.springframework.context.annotation.ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClassParser.java:222) 
at org.springframework.context.annotation.ConfigurationClassParser.processConfigurationClass(ConfigurationClassParser.java:165) 
at org.springframework.context.annotation.ConfigurationClassParser.parse(ConfigurationClassParser.java:140) 
at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:282) 
at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:223) 
at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:630) 
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:461) 
at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73) 
at com.truecar.inventory.worker.core.consumer.ConsumerThreadPool.<clinit>(ConsumerThreadPool.java:31) 
at com.truecar.inventory.worker.core.application.Starter.main(Starter.java:20) 
... 6 more 
Caused by: java.lang.ClassNotFoundException: scala.ScalaObject 
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:713) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
... 27 more 

現在,我不太瞭解卡夫卡,並沒有什麼關於斯卡拉。我該如何解決?接下來我應該嘗試什麼?這是一個已知的問題?我需要其他依賴關係嗎?下面是卡夫卡的版本在我的pom.xml:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.9.2</artifactId> 
    <version>0.8.0-beta1</version> 
</dependency> 

更新:我聯繫了卡夫卡dev郵件列表,他們讓我知道斯卡拉依賴一些特定的版本要求。但是,還有一個未記錄的log4j依賴項,它導致另一個運行時,而不是編譯時,異常。

Exception in thread "main" java.lang.reflect.InvocationTargetException 
Caused by: java.lang.NoSuchMethodError: ch.qos.logback.classic.Logger.filterAndLog(Ljava/lang/String;Lorg/slf4j/Marker;Lch/qos/logback/classic/Level;Ljava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V 
at org.apache.log4j.Category.log(Category.java:333) 
at org.apache.commons.logging.impl.Log4JLogger.debug(Log4JLogger.java:177) 

另一個更新:

我找到了正確的log4j的依賴性:

<dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 

但現在我遇到了一個更加神祕的運行時異常:

Exception in thread "main" java.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at com.simontuffs.onejar.Boot.run(Boot.java:340) 
at com.simontuffs.onejar.Boot.main(Boot.java:166) 
Caused by: java.lang.NoClassDefFoundError: org/I0Itec/zkclient/IZkStateListener 
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64) 
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66) 
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) 
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) 

在這點我得到了WTF的那種感覺。於是,我又增加了依賴性:

<dependency> 
     <groupId>com.101tec</groupId> 
     <artifactId>zkclient</artifactId> 
     <version>0.3</version> 
    </dependency> 

但這暴露的另一個運行時異常:

Exception in thread "main" java.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at com.simontuffs.onejar.Boot.run(Boot.java:340) 
at com.simontuffs.onejar.Boot.main(Boot.java:166) 
Caused by: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge 
at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:146) 
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:113) 
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64) 
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66) 
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) 
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) 

我希望能夠得到這個寶貝例如啓動和運行,但也許這是價格支付使用測試版產品?也許我應該切換到Apache Active MQ。但這聽起來不太有趣。我錯過了什麼嗎?

回答

9

問題是kafka beta was built in a way that pom generated with a jar isn't valid and maven could not recognize it and parse properly,從而獲取傳遞依賴。我們設法通過在我們的pom定義中引入來自該pom(scala,zk等)的所有依賴關係來緩解這個問題。我們正在等待卡夫卡的下一個測試版本,其中的問題將得到解決。

完整的依賴關係列表如下。請注意,您必須相應地將scala版本依賴項更改爲您的kafka工件的後綴。

<dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-library</artifactId> 
      <version>2.8.0</version> 
     </dependency> 
     <dependency> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
      <version>1.2.15</version> 
      <exclusions> 
       <exclusion> 
        <groupId>com.sun.jmx</groupId> 
        <artifactId>jmxri</artifactId> 
       </exclusion> 
       <exclusion> 
        <groupId>com.sun.jdmk</groupId> 
        <artifactId>jmxtools</artifactId> 
       </exclusion> 
       <exclusion> 
        <groupId>javax.jms</groupId> 
        <artifactId>jms</artifactId> 
       </exclusion> 
      </exclusions> 
     </dependency> 
     <dependency> 
      <groupId>net.sf.jopt-simple</groupId> 
      <artifactId>jopt-simple</artifactId> 
      <version>3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-simple</artifactId> 
      <version>1.6.4</version> 
     </dependency> 
     <dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-compiler</artifactId> 
      <version>2.8.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.101tec</groupId> 
      <artifactId>zkclient</artifactId> 
      <version>0.3</version> 
     </dependency> 
     <dependency> 
      <groupId>com.yammer.metrics</groupId> 
      <artifactId>metrics-core</artifactId> 
      <version>2.2.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.yammer.metrics</groupId> 
      <artifactId>metrics-annotation</artifactId> 
      <version>2.2.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.easymock</groupId> 
      <artifactId>easymock</artifactId> 
      <version>3.0</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.scalatest</groupId> 
      <artifactId>scalatest</artifactId> 
      <version>1.2</version> 
      <scope>test</scope> 
     </dependency> 

對於

Maybe I should switch to Apache Active MQ. But that sounds less fun. Am I missing something?

嘛,難道你忘了,這是測試釋放?實際上,一些不好的事情正在發生,但目前我們正在運行kafka 0.7而沒有任何努力

+0

真棒,謝謝你的回答。我想嘗試0.7,但Maven只有0.8罐可用。您在程序訪問方面會推薦什麼? –

+0

@DavidWilliams我們使用[kafka由twitter構建](http://search.maven.org/#artifactdetails%7Ccom.twitter%7Ckafka_2.9.2%7C0.7.0%7Cjar)爲0.7。 *程序訪問*意味着什麼? –

+0

啊,應該更具體一些,用Java。卡夫卡唯一的Maven文物是0.8 –

3

我發現依賴這種配置是功能性:

<dependencies> 
    <dependency> 
     <groupId>org.springframework</groupId> 
     <artifactId>spring-core</artifactId> 
     <version>3.2.4.RELEASE</version> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework</groupId> 
     <artifactId>spring-context</artifactId> 
     <version>3.2.4.RELEASE</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.9.2</artifactId> 
     <version>0.8.0-beta1</version> 
    </dependency> 
    <dependency> 
     <groupId>javax.inject</groupId> 
     <artifactId>javax.inject</artifactId> 
     <version>1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.scala-lang</groupId> 
     <artifactId>scala-library</artifactId> 
     <version>2.9.2</version> 
    </dependency> 
    <dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 
    <dependency> 
     <groupId>com.101tec</groupId> 
     <artifactId>zkclient</artifactId> 
     <version>0.3</version> 
    </dependency> 
    <dependency> 
     <groupId>com.yammer.metrics</groupId> 
     <artifactId>metrics-core</artifactId> 
     <version>2.2.0</version> 
    </dependency> 
</dependencies> 
相關問題