2017-10-28 86 views
1

我試圖啓動一個卡夫卡服務器形式的Java創建`KafkaServer`從Java

具體來說,我怎麼能翻譯斯卡拉的this line入行渣華的?

private val server = new KafkaServer(serverConfig, kafkaMetricsReporters = reporters) 

我可以輕鬆地創建serverConfig中,但我似乎無法能夠創建kafkaMetricsReporters參數。

注:我可以創建一個KafkaServerStartable,但我想創建一個正常的KafkaServer以避免JVM在發生錯誤時退出。

Apache的版本卡夫卡0.11.0.1

回答

1

kafkaMetricsReporters參數是斯卡拉Seq

您可以:

  1. 創建一個Java集合並將其轉換成一個序列:

    您需要導入scala.collection.JavaConverters

    List<KafkaMetricsReporter> reportersList = new ArrayList<>(); 
    ... 
    Seq<KafkaMetricsReporter> reportersSeq = JavaConverters.asScalaBufferConverter(reportersList).asScala(); 
    
  2. 使用KafkaMetricsReporter.startReporters()方法來創建它們對於您的配置:

    作爲KafkaMetricsReporter是單身,你需要使用MODULE符號來使用它:

    Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter$.MODULE$.startReporters(new VerifiableProperties(props)); 
    

另外,KafkaServer構造有從Java調用它時需要2個其他參數:

  • time可以很容易地使用new org.apache.kafka.common.utils.SystemTime()
  • threadNamePrefix是一個選項。如果導入scala.Option,你就可以調用Option.apply("prefix")

全部放在一起:

Properties props = new Properties(); 
props.put(...); 
KafkaConfig config = KafkaConfig.fromProps(props); 
Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter$.MODULE$.startReporters(new VerifiableProperties(props)); 
KafkaServer server = new KafkaServer(config, new SystemTime(), Option.apply("prefix"), reporters); 
server.startup(); 
+0

謝謝:) 兩個問題:你的意思是'reportersList',而不是'reporters'? 'new SystemTime()'來自哪裏(哪個包)? – nha

+0

另外,這意味着不會有記者,對嗎? (目前我很好,雖然我不得不在某些時候創建一些) – nha

+1

我已經更新了我的答案。 'startReporters()'可能是你想用的 –