我通過卡夫卡快速入門工作:Kafka Quickstart:我需要什麼依賴關係?
http://kafka.apache.org/07/quickstart.html
和基本消費羣例如:
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
我已經編寫了消費者和ConsumerThreadPool如上:
import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerIterator;
public class Consumer implements Runnable {
private KafkaStream m_stream;
private Integer m_threadNumber;
public Consumer(KafkaStream a_stream, Integer a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()) {
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
}
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}
其他方面:我是usi ng spring來管理我的動物園管理員:
import javax.inject.Named;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan("com.truecar.inventory.worker.core")
public class AppConfig {
@Bean
@Named("consumerConfig")
private static ConsumerConfig createConsumerConfig() {
String zookeeperAddress = "127.0.0.1:2181";
String groupId = "inventory";
Properties props = new Properties();
props.put("zookeeper.connect", zookeeperAddress);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
}
我正在編譯Maven和OneJar Maven插件。但是,我編譯並運行所產生的一個罐子,我得到以下錯誤:
Aug 26, 2013 6:15:41 PM org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider registerDefaultFilters
INFO: JSR-330 'javax.inject.Named' annotation found and supported for component scanning
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: scala/ScalaObject
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:792)
at com.simontuffs.onejar.JarClassLoader.defineClass(JarClassLoader.java:803)
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:710)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2521)
at java.lang.Class.getDeclaredMethods(Class.java:1845)
at org.springframework.core.type.StandardAnnotationMetadata.getAnnotatedMethods(StandardAnnotationMetadata.java:180)
at org.springframework.context.annotation.ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClassParser.java:222)
at org.springframework.context.annotation.ConfigurationClassParser.processConfigurationClass(ConfigurationClassParser.java:165)
at org.springframework.context.annotation.ConfigurationClassParser.parse(ConfigurationClassParser.java:140)
at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:282)
at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:223)
at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:630)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:461)
at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73)
at com.truecar.inventory.worker.core.consumer.ConsumerThreadPool.<clinit>(ConsumerThreadPool.java:31)
at com.truecar.inventory.worker.core.application.Starter.main(Starter.java:20)
... 6 more
Caused by: java.lang.ClassNotFoundException: scala.ScalaObject
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:713)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 27 more
現在,我不太瞭解卡夫卡,並沒有什麼關於斯卡拉。我該如何解決?接下來我應該嘗試什麼?這是一個已知的問題?我需要其他依賴關係嗎?下面是卡夫卡的版本在我的pom.xml:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.0-beta1</version>
</dependency>
更新:我聯繫了卡夫卡dev郵件列表,他們讓我知道斯卡拉依賴一些特定的版本要求。但是,還有一個未記錄的log4j依賴項,它導致另一個運行時,而不是編譯時,異常。
Exception in thread "main" java.lang.reflect.InvocationTargetException
Caused by: java.lang.NoSuchMethodError: ch.qos.logback.classic.Logger.filterAndLog(Ljava/lang/String;Lorg/slf4j/Marker;Lch/qos/logback/classic/Level;Ljava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V
at org.apache.log4j.Category.log(Category.java:333)
at org.apache.commons.logging.impl.Log4JLogger.debug(Log4JLogger.java:177)
另一個更新:
我找到了正確的log4j的依賴性:
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
但現在我遇到了一個更加神祕的運行時異常:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: org/I0Itec/zkclient/IZkStateListener
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
在這點我得到了WTF的那種感覺。於是,我又增加了依賴性:
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
但這暴露的另一個運行時異常:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge
at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:146)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:113)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
我希望能夠得到這個寶貝例如啓動和運行,但也許這是價格支付使用測試版產品?也許我應該切換到Apache Active MQ。但這聽起來不太有趣。我錯過了什麼嗎?
真棒,謝謝你的回答。我想嘗試0.7,但Maven只有0.8罐可用。您在程序訪問方面會推薦什麼? –
@DavidWilliams我們使用[kafka由twitter構建](http://search.maven.org/#artifactdetails%7Ccom.twitter%7Ckafka_2.9.2%7C0.7.0%7Cjar)爲0.7。 *程序訪問*意味着什麼? –
啊,應該更具體一些,用Java。卡夫卡唯一的Maven文物是0.8 –