我正在將Play v 2.3.4應用程序遷移到Play v 2.5.4。一路上,我不得不升級到Scala 2.11.8和kafka 9.0+以支持更新的Play版本。由於無效的ZKStringSerializer引用而無法Instanciate ZkClient參考
我已經解決了大部分問題,但是我找不到一些通過AdminUtils管理Kafka主題的代碼的卡夫卡問題。麻煩都集中在kafka.utils.ZkStringSerialzier周圍。
我正在使用org.I0Itec.zkclient包實例ZkClient對象,它是在ZkUtils對象的構造中傳遞的,但它因爲無法解析我的ZkStringSerializer而失敗。
相關的代碼是:在ZKStringSerializer是從他處人跡罕至的錯誤
import kafka.admin.AdminUtils
import kafka.utils.ZkUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.{ZkClient, ZkConnection}
object Topic {
def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = {
var zkSerializer: ZKStringSerializer = ZKStringSerializer
val zkClient: ZkClient= new ZkClient(zookeeperHosts, connectionTimeoutMs, sessionTimeoutMs, zkSerializer)
val topicConfig: Properties = new Properties()
val isSecureKafkaCluster: Boolean = false
val zkUtils: ZkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), isSecureKafkaCluster)
AdminUtils.createTopic(zkUtils, topic, partitionSize, replicationCount, topicConfig)
zkClient.close()
}
}
上面的代碼的結果。
我發現了幾個相關的職位,以創建主題(主要在Java和卡夫卡9.0之前) Creating a topic for Apache Kafka 0.9 Using Java How create Kafka ZKStringSerializer in Java? How Can we create a topic in Kafka from the IDE using API 最後 Creating a Kafka topic results in no leader
基於這些我通過代碼更新如下:
import kafka.admin.AdminUtils
import kafka.utils.ZkUtils
import kafka.utils.ZKStringSerializer$
import org.I0Itec.zkclient.{ZkClient, ZkConnection}
object Topic {
def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = {
var zkSerializer: ZKStringSerializer = ZKStringSerializer$.MODULE$
val zkClient: ZkClient= new ZkClient(zookeeperHosts, connectionTimeoutMs, sessionTimeoutMs, zkSerializer)
val topicConfig: Properties = new Properties()
val isSecureKafkaCluster: Boolean = false
val zkUtils: ZkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), isSecureKafkaCluster)
AdminUtils.createTopic(zkUtils, topic, partitionSize, replicationCount, topicConfig)
zkClient.close()
}
}
然後我只是無法解決符號ZkStringSerialzer $錯誤。
我嘗試了org.I0Itec.zkclient.serialize.ZkSerializer對象,它沒有什麼區別。
所以我的問題實際上是兩方面的: 1.在scala中導入和聲明語句的'$'字符有什麼意義。我已經在字符串插值中使用它(e/g/s「var value是$ var」)來引用變量,但是這看起來不同。 2.我的代碼有什麼問題。這是我導入,聲明,還是別的嗎?
我是新來斯卡拉和播放,但我感覺像很白癡的時刻,因此任何建議/幫助表示讚賞
〜戴夫
附: 在情況下,它可以幫助相關的位由項目文件
build.sbt:
lazy val `api` = (project in file(".")).enablePlugins(PlayScala)
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka_2.11" % "0.9.0.1",
jdbc,
cache,
ws,
specs2 % Test
)
plugins.sbt:
resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/"
addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.5.4")
addSbtPlugin("com.typesafe.sbt" % "sbt-coffeescript" % "1.0.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-less" % "1.0.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-jshint" % "1.0.1")
addSbtPlugin("com.typesafe.sbt" % "sbt-rjs" % "1.0.1")
addSbtPlugin("com.typesafe.sbt" % "sbt-digest" % "1.0.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-mocha" % "1.0.0")
build.properties:
sbt.version=0.13.5
您所做的與創建zkUtils一樣,是用'false'替換'ZkStringSerializer'。這是如何運作的? – user2418202
@ user2418202我原來的問題是試圖實例化ZkSerializer傳遞給I0Itec ZkClient。我停止嘗試使用org.I0Itec.zkclient包創建一個ZKClient,並讓它直接由Kafka ZkUtils處理。 – DVS