0
我正在寫sck卡夫卡製作人,我想從斯卡拉卡夫卡客戶端發送消息給卡夫卡經紀人,問題是經紀人沒有得到這些消息我通過驗證從命令行啓動kafka使用者。卡夫卡製片人和消費者在命令提示符下工作正常。 Kafka是Kerberos和SASL_PlainText安全性啓用。發送數據到Kerberos從Scala啓用Kafka集羣客戶端
請在下面找到我的conf文件,客戶端代碼和應用程序日誌。我認爲從代碼連接到Kerberos時一定有一些問題。
斯卡拉客戶:
package com.ABC.adds.producer
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import com.ABC.adds.models.Models.GMMOfaq
import com.ABC.adds.producer.serializer.ModelSerializer
import com.thoughtworks.xstream.XStream
import com.thoughtworks.xstream.io.xml.DomDriver
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.ByteArraySerializer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
object faqProducer extends App with LazyLogging{
val config = ConfigFactory.load()
implicit val system = ActorSystem.create("adds-faq-producer", config)
implicit val mat = ActorMaterializer()
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new ModelSerializer[PPOfaq](classOf[PPOfaq]))
.withBootstrapServers("jbt12324.systems.pfk.ABC:3382")
.withProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
.withProperty("zookeeper.connect","jbt234234.systems.pfk.ABC:2341,jbt234.systems.pfk.ABC:234,jbt1234324.systems.pfk.ABC:2234")
.withProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "1")
val xstream = new XStream(new DomDriver)
val personString: String = scala.io.Source.fromInputStream(getClass().getClassLoader().getResourceAsStream("PPOfaq.xml")).mkString
xstream.alias("faq", classOf[PPOfaq])
val ppofaq: PPOfaq = xstream.fromXML(personString).asInstanceOf[PPOfaq]
logger.info(s"Producer Configuration is : {} ", producerSettings.toString)
logger.info(s"Sending message : {}", ppofaq)
logger.info("KafkaProducer Send first fetching Partitions for topics")
val kafkaProducer = producerSettings.createKafkaProducer()
kafkaProducer.partitionsFor("asp.adds.ppo.pems")
val done1 = kafkaProducer.send(new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq))
val recordMetaData : RecordMetadata = done1.get()
logger.info("Topic is : " + recordMetaData.topic() +" partition is : "+ recordMetaData.partition() +" offset is : "+ recordMetaData.offset())
logger.info("KafkaProdcuer Send first fetching Partitions for topics end")
val done = Source.single(ppofaq)
.map { elem =>
new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq)
}
.runWith(Producer.plainSink(producerSettings))
done onComplete {
case Success(s) => {
logger.info(s"The producer has sent a message to the topic: asp.adds.ppo.pems!!")
}
case Failure(e) => {
logger.error("Erorr occured while producing Topic", e)
e.printStackTrace()
e.fillInStackTrace()
e.getCause
e.getMessage
}
}
}
這是Kafka_Client的conf文件我使用Kerberos身份驗證:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=false
useKeyTab=true
serviceName="kafka"
principal="[email protected]"
keyTab="/home/pqr/.pqr.headless.keytab"
debug=true
client=true;
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useKeyTab=true
useTicketCache=false
serviceName="zookeeper"
principal="[email protected]"
keyTab="/home/pqr/.pqr.headless.keytab"
debug=true;
};
這是當我在羣集中運行我的罐子我得到了應用程序日誌: 應用程序日誌:
[[email protected] ResourceBundle]$ java -Djava.security.auth.login.config=kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -Djava.security.debug=logincontext,gssloginconfig,configfile,configparser, -jar adds-producer.jar
[Policy Parser]: creating policy entry for expanded java.ext.dirs path:
file:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.51-0.b16.el6_6.x86_64/jre/lib/ext/*
[Policy Parser]: creating policy entry for expanded java.ext.dirs path:
file:/usr/java/packages/lib/ext/*
14:44:56.520 [main] INFO c.h.adds.producer.addsProducer$ - Producer Configuration is : [email protected]
14:44:56.523 [main] INFO c.h.adds.producer.addsProducer$ - Sending message : PPOadds(PIC_EVENT,01/06/2016,26/10/2016,ASSIGNED,asd_asdasd_ERRORED,asd,asdMSR,High,CASE-3,CASE-4,CASE-1,CASE-2,,CustomAttributes(GCAL,PTS Only,,DTF_INT_WORKFLOWS_COMPLETE,16065763,2495921,12,CreditDefaultSwap,CreditDefaultSwap,VERIFIED_asdasd,ABCUSA,asdCDS))
14:44:56.524 [main] INFO c.h.adds.producer.addsProducer$ - KafkaProducer Send first fetching Partitions for topics
configfile: reading file:/home/asdasd-asdasdds-adds-asda/adasd-erer/hfgh/kafka_client_jaas.conf
configparser: Reading next config entry: KafkaClient
configparser: com.sun.security.auth.module.Krb5LoginModule, REQUIRED
configparser: [email protected]
configparser: debug=true
configparser: doNotPrompt=true
configparser: keyTab=/home/asdasd-asdad-adds-rrewr/.sdfsf-sdfsd-adds-sdfsf.headless.keytab
configparser: client=true
configparser: useKeyTab=true
configparser: useTicketCache=false
configparser: serviceName=kafka
configparser: Reading next config entry: Client
configparser: com.sun.security.auth.module.Krb5LoginModule, REQUIRED
configparser: [email protected]
configparser: debug=true
configparser: doNotPrompt=true
configparser: keyTab=/home/sdfdsf-sfds-adds-sdf/.sdff.sdsfs-adds-usdfs.headless.keytab
configparser: useKeyTab=true
configparser: useTicketCache=false
configparser: serviceName=zookeeper
Debug is true storeKey false useTicketCache false useKeyTab true doNotPrompt true ticketCache is null isInitiator true KeyTab is /home/dasda-sasd-adds-asdad/.asdad-asd-adds-adsasd.headless.keytab refreshKrb5Config is false principal is [email protected] tryFirstPass is false useFirstPass is false storePass is false clearPass is false
principal is [email protected]
Will use keytab
[LoginContext]: login success
Commit Succeeded
[LoginContext]: commit success
14:44:56.748 [main] WARN o.a.k.c.producer.ProducerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config.
adds in thread "main" org.apache.kafka.common.errors.Timeoutadds: Failed to update metadata after 60000 ms.
請讓我知道如果ia我做錯了什麼。 謝謝, 馬亨德拉Tonape