如需辦理卡夫卡的失敗,我使用了Apache館長框架,並採取以下解決辦法:
val client: CuratorFramework = ... // see docs
val zk: CuratorZookeeperClient = client.getZookeeperClient
/**
* This method returns false if kafka or zookeeper is down.
*/
def isKafkaAvailable:Boolean =
Try {
if (zk.isConnected) {
val xs = client.getChildren.forPath("/brokers/ids")
xs.size() > 0
}
else false
}.getOrElse(false)
對於消費卡夫卡的話題,我用了com.softwaremill.reactivekafka
庫。例如:
class KafkaConsumerActor extends Actor {
val kafka = new ReactiveKafka()
val config: ConsumerProperties[Array[Byte], Any] = ... // see docs
override def preStart(): Unit = {
super.preStart()
val publisher = kafka.consume(config)
Source.fromPublisher(publisher)
.map(handleKafkaRecord)
.to(Sink.ignore).run()
}
/**
* This method will be invoked when any kafka records will happen.
*/
def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = {
// handle record
}
}