2016-11-10 34 views
0

在過去6個月中,我是我們在kafka-0.8.1.1之上編寫的解決方案的開發人員。它對我們來說很穩定。我們認爲我們會升級到kafka-0.9.0.1。 隨着服務器升級,我們沒有遇到任何問題。kafka_2.11-0.9.0.1和scala問題2.11.7

我們有自己的解決方案,可以提取郵件並寫入不同的目的地,也可以通過風暴來讀取郵件。對於我們的單元測試,我們使用以下maven神器

<groupId>org.apache.kafka</groupId> 
<artifactId>kafka_2.9.2</artifactId> 
<version>0.8.1.1</version> 

我找不到,0.9.0.1版本爲kafka_2.9.2。因此我首先轉到了kafka_2.11。這是所使用的神器:

<groupId>org.apache.kafka</groupId> 
<artifactId>kafka_2.11</artifactId> 
<version>0.9.0.1</version> 

我也陷入了以下問題:

scala.ScalaObject not found issue 
java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object; 
kafkaConfig<init> issue with NoSuchMethodError (Ljava/util/map;)Ljava/util/map 

而且大部分的時間,我會碰上KafkaServerStartable(無論是在kafka_2.10-0.9.0.1和kafka_2 .11-0.9.0.1)掛起問題。但是通過相同的單元測試,我從來沒有因爲kafka_2.9.2而陷入kafka服務器掛起問題。

你能幫我解決我的問題嗎? 我錯過了什麼嗎?

+0

檢查你的類路徑,看是否存在任何2.10版本的scala庫。這可能是由版本衝突造成的。 – amethystic

+0

*當*和*在哪裏*你遇到「ScalaObject找不到問題」?在您的自定義應用程序中,在風暴/風暴的卡夫卡噴嘴中,......? –

+0

當我運行風暴kafka噴口測試時,它使用KafkaServerStartable啓動kafka服務器進行單元測試。使用下面的配置,它會嘗試啓動並運行到KafkaConfig錯誤。 – DivH

回答

0

這不是一個答案。關於我的問題跟進: 這是,現有的代碼使用開始測試服務器的卡夫卡配置: 依賴,我試圖:

<groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> # in this, scala 11.4, 11.7 used alternativel to verify <version>0.9.0.1</version>

<groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> # in this scala 10.4 is used <version>0.9.0.1</version>

公共KafkaTestServer(INT端口, ZookeeperTestServer zkServer,String brokerId,int defaultPartitionCount)拋出異常{this.zkServer = zkServer;

KafkaConfig config = getKafkaConfig(zkServer.getConnectString(), port, brokerId, defaultPartitionCount); 
    kafkaServer = new KafkaServerStartable(config); 

    kafkaServer.startup(); 
    ProducerConfig conf = new ProducerConfig(getProducerConfig(getKafkaBrokerString())); 
    producer = new Producer<>(conf); 

} 

private KafkaConfig getKafkaConfig(String zkConnectString, int port, String brokerId, int defaultPartitionCount) { 
    Properties props = new Properties(); 
    props.setProperty("zookeeper.connect", zkConnectString); 
    props.setProperty("broker.id", brokerId); 
    props.setProperty("port", Integer.toString(port)); 
    createKafkaDataDirectory(); 
    props.setProperty("log.dirs", dataDirectory.getAbsolutePath()); 
    props.setProperty("num.partitions", Integer.toString(defaultPartitionCount)); 
    props.setProperty("retry.backoff.ms", "500"); 

    return new KafkaConfig(props, false); 
}