2016-08-05 27 views
0

我正在將Play v 2.3.4應用程序遷移到Play v 2.5.4。一路上,我不得不升級到Scala 2.11.8和kafka 9.0+以支持更新的Play版本。由於無效的ZKStringSerializer引用而無法Instanciate ZkClient參考

我已經解決了大部分問題,但是我找不到一些通過AdminUtils管理Kafka主題的代碼的卡夫卡問題。麻煩都集中在kafka.utils.ZkStringSerialzier周圍。

我正在使用org.I0Itec.zkclient包實例ZkClient對象,它是在ZkUtils對象的構造中傳遞的,但它因爲無法解析我的ZkStringSerializer而失敗。

相關的代碼是:在ZKStringSerializer是從他處人跡罕至的錯誤

import kafka.admin.AdminUtils 
import kafka.utils.ZkUtils 
import kafka.utils.ZKStringSerializer 
import org.I0Itec.zkclient.{ZkClient, ZkConnection} 
object Topic { 
    def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = { 
     var zkSerializer: ZKStringSerializer = ZKStringSerializer 
     val zkClient: ZkClient= new ZkClient(zookeeperHosts, connectionTimeoutMs, sessionTimeoutMs, zkSerializer) 
     val topicConfig: Properties = new Properties() 
     val isSecureKafkaCluster: Boolean = false 

     val zkUtils: ZkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), isSecureKafkaCluster) 

     AdminUtils.createTopic(zkUtils, topic, partitionSize, replicationCount, topicConfig) 
     zkClient.close() 
    } 
} 

上面的代碼的結果。

我發現了幾個相關的職位,以創建主題(主要在Java和卡夫卡9.0之前) Creating a topic for Apache Kafka 0.9 Using Java How create Kafka ZKStringSerializer in Java? How Can we create a topic in Kafka from the IDE using API 最後 Creating a Kafka topic results in no leader

基於這些我通過代碼更新如下:

import kafka.admin.AdminUtils 
import kafka.utils.ZkUtils 
import kafka.utils.ZKStringSerializer$ 
import org.I0Itec.zkclient.{ZkClient, ZkConnection} 
object Topic { 
    def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = { 
     var zkSerializer: ZKStringSerializer = ZKStringSerializer$.MODULE$ 
     val zkClient: ZkClient= new ZkClient(zookeeperHosts, connectionTimeoutMs, sessionTimeoutMs, zkSerializer) 
     val topicConfig: Properties = new Properties() 
     val isSecureKafkaCluster: Boolean = false 

     val zkUtils: ZkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), isSecureKafkaCluster) 

     AdminUtils.createTopic(zkUtils, topic, partitionSize, replicationCount, topicConfig) 
     zkClient.close() 
    } 
} 

然後我只是無法解決符號ZkStringSerialzer $錯誤。

我嘗試了org.I0Itec.zkclient.serialize.ZkSerializer對象,它沒有什麼區別。

所以我的問題實際上是兩方面的: 1.在scala中導入和聲明語句的'$'字符有什麼意義。我已經在字符串插值中使用它(e/g/s「var value是$ var」)來引用變量,但是這看起來不同。 2.我的代碼有什麼問題。這是我導入,聲明,還是別的嗎?

我是新來斯卡拉和播放,但我感覺像很白癡的時刻,因此任何建議/幫助表示讚賞

〜戴夫

附: 在情況下,它可以幫助相關的位由項目文件

build.sbt:

lazy val `api` = (project in file(".")).enablePlugins(PlayScala) 
scalaVersion := "2.11.8" 

libraryDependencies ++= Seq(
    "org.apache.kafka" % "kafka_2.11" % "0.9.0.1", 
    jdbc, 
    cache, 
    ws, 
    specs2 % Test 
) 

plugins.sbt:

resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/" 

addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.5.4") 

addSbtPlugin("com.typesafe.sbt" % "sbt-coffeescript" % "1.0.0") 

addSbtPlugin("com.typesafe.sbt" % "sbt-less" % "1.0.0") 

addSbtPlugin("com.typesafe.sbt" % "sbt-jshint" % "1.0.1") 

addSbtPlugin("com.typesafe.sbt" % "sbt-rjs" % "1.0.1") 

addSbtPlugin("com.typesafe.sbt" % "sbt-digest" % "1.0.0") 

addSbtPlugin("com.typesafe.sbt" % "sbt-mocha" % "1.0.0") 

build.properties:

sbt.version=0.13.5 

回答

0

戰鬥之後我放棄了之前使用過的ZKClient軟件包,並且簡化了這個問題e直接使用了Kafka,這實際上比使用I0Itech ZKClient更清潔。

新的實現是這樣的:

import java.util.Properties 
import kafka.admin.AdminUtils 
import kafka.utils.ZkUtils 

class Topic { 
    def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = { 
    if (ListKafkaTopics(zookeeperHosts).contains(topic)) { 
     return false 
    } 
    val zkUtils = ZkUtils.apply(zookeeperHosts, sessionTimeoutMs, connectionTimeoutMs, false) 
    AdminUtils.createTopic(zkUtils, topic, partitionSize, replicationCount, new Properties()) 
    zkUtils.close() 
    true 
    } 
} 

末結束取出的依賴,使清潔代碼,這樣一個雙贏的,我想。

〜Dave

+0

您所做的與創建zkUtils一樣,是用'false'替換'ZkStringSerializer'。這是如何運作的? – user2418202

+0

@ user2418202我原來的問題是試圖實例化ZkSerializer傳遞給I0Itec ZkClient。我停止嘗試使用org.I0Itec.zkclient包創建一個ZKClient,並讓它直接由Kafka ZkUtils處理。 – DVS