2017-06-12 59 views
0

我正在寫sck卡夫卡製作人,我想從斯卡拉卡夫卡客戶端發送消息給卡夫卡經紀人,問題是經紀人沒有得到這些消息我通過驗證從命令行啓動kafka使用者。卡夫卡製片人和消費者在命令提示符下工作正常。 Kafka是Kerberos和SASL_PlainText安全性啓用。發送數據到Kerberos從Scala啓用Kafka集羣客戶端

請在下面找到我的conf文件,客戶端代碼和應用程​​序日誌。我認爲從代碼連接到Kerberos時一定有一些問題。

斯卡拉客戶:

package com.ABC.adds.producer 

import akka.actor.ActorSystem 
import akka.kafka.ProducerSettings 
import akka.kafka.scaladsl.Producer 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.Source 
import com.ABC.adds.models.Models.GMMOfaq 
import com.ABC.adds.producer.serializer.ModelSerializer 
import com.thoughtworks.xstream.XStream 
import com.thoughtworks.xstream.io.xml.DomDriver 
import com.typesafe.config.ConfigFactory 
import com.typesafe.scalalogging.LazyLogging 
import org.apache.kafka.clients.CommonClientConfigs 
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} 
import org.apache.kafka.common.serialization.ByteArraySerializer 

import scala.concurrent.ExecutionContext.Implicits.global 
import scala.util.{Failure, Success} 

object faqProducer extends App with LazyLogging{ 

    val config = ConfigFactory.load() 
    implicit val system = ActorSystem.create("adds-faq-producer", config) 
    implicit val mat = ActorMaterializer() 

    val producerSettings = ProducerSettings(system, new ByteArraySerializer, new ModelSerializer[PPOfaq](classOf[PPOfaq])) 
    .withBootstrapServers("jbt12324.systems.pfk.ABC:3382") 
    .withProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") 
     .withProperty("zookeeper.connect","jbt234234.systems.pfk.ABC:2341,jbt234.systems.pfk.ABC:234,jbt1234324.systems.pfk.ABC:2234") 
     .withProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "1") 

    val xstream = new XStream(new DomDriver) 
    val personString: String = scala.io.Source.fromInputStream(getClass().getClassLoader().getResourceAsStream("PPOfaq.xml")).mkString 
    xstream.alias("faq", classOf[PPOfaq]) 
    val ppofaq: PPOfaq = xstream.fromXML(personString).asInstanceOf[PPOfaq] 

    logger.info(s"Producer Configuration is : {} ", producerSettings.toString) 
    logger.info(s"Sending message : {}", ppofaq) 

    logger.info("KafkaProducer Send first fetching Partitions for topics") 
    val kafkaProducer = producerSettings.createKafkaProducer() 
    kafkaProducer.partitionsFor("asp.adds.ppo.pems") 
    val done1 = kafkaProducer.send(new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq)) 
    val recordMetaData : RecordMetadata = done1.get() 

    logger.info("Topic is : " + recordMetaData.topic() +" partition is : "+ recordMetaData.partition() +" offset is : "+ recordMetaData.offset()) 

    logger.info("KafkaProdcuer Send first fetching Partitions for topics end") 

    val done = Source.single(ppofaq) 
    .map { elem => 
     new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq) 
    } 
    .runWith(Producer.plainSink(producerSettings)) 

    done onComplete { 
    case Success(s) => { 
    logger.info(s"The producer has sent a message to the topic: asp.adds.ppo.pems!!") 
    } 
    case Failure(e) => { 
    logger.error("Erorr occured while producing Topic", e) 
    e.printStackTrace() 
    e.fillInStackTrace() 
    e.getCause 
    e.getMessage 
    } 
} 
} 

這是Kafka_Client的conf文件我使用Kerberos身份驗證:

KafkaClient { 
com.sun.security.auth.module.Krb5LoginModule required 
doNotPrompt=true 
useTicketCache=false 
useKeyTab=true 
serviceName="kafka" 
principal="[email protected]" 
keyTab="/home/pqr/.pqr.headless.keytab" 
debug=true 
client=true; 
}; 
Client { 
    com.sun.security.auth.module.Krb5LoginModule required 
    doNotPrompt=true 
    useKeyTab=true 
    useTicketCache=false 
    serviceName="zookeeper" 
    principal="[email protected]" 
    keyTab="/home/pqr/.pqr.headless.keytab" 
    debug=true; 
}; 

這是當我在羣集中運行我的罐子我得到了應用程序日誌: 應用程序日誌:

[[email protected] ResourceBundle]$ java -Djava.security.auth.login.config=kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -Djava.security.debug=logincontext,gssloginconfig,configfile,configparser, -jar adds-producer.jar 
      [Policy Parser]: creating policy entry for expanded java.ext.dirs path: 
        file:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.51-0.b16.el6_6.x86_64/jre/lib/ext/* 
      [Policy Parser]: creating policy entry for expanded java.ext.dirs path: 
        file:/usr/java/packages/lib/ext/* 
    14:44:56.520 [main] INFO c.h.adds.producer.addsProducer$ - Producer Configuration is : [email protected] 
    14:44:56.523 [main] INFO c.h.adds.producer.addsProducer$ - Sending message : PPOadds(PIC_EVENT,01/06/2016,26/10/2016,ASSIGNED,asd_asdasd_ERRORED,asd,asdMSR,High,CASE-3,CASE-4,CASE-1,CASE-2,,CustomAttributes(GCAL,PTS Only,,DTF_INT_WORKFLOWS_COMPLETE,16065763,2495921,12,CreditDefaultSwap,CreditDefaultSwap,VERIFIED_asdasd,ABCUSA,asdCDS)) 
    14:44:56.524 [main] INFO c.h.adds.producer.addsProducer$ - KafkaProducer Send first fetching Partitions for topics 
    configfile: reading file:/home/asdasd-asdasdds-adds-asda/adasd-erer/hfgh/kafka_client_jaas.conf 
    configparser: Reading next config entry: KafkaClient 
    configparser:   com.sun.security.auth.module.Krb5LoginModule, REQUIRED 
    configparser:     [email protected] 
    configparser:     debug=true 
    configparser:     doNotPrompt=true 
    configparser:     keyTab=/home/asdasd-asdad-adds-rrewr/.sdfsf-sdfsd-adds-sdfsf.headless.keytab 
    configparser:     client=true 
    configparser:     useKeyTab=true 
    configparser:     useTicketCache=false 
    configparser:     serviceName=kafka 
    configparser: Reading next config entry: Client 
    configparser:   com.sun.security.auth.module.Krb5LoginModule, REQUIRED 
    configparser:     [email protected] 
    configparser:     debug=true 
    configparser:     doNotPrompt=true 
    configparser:     keyTab=/home/sdfdsf-sfds-adds-sdf/.sdff.sdsfs-adds-usdfs.headless.keytab 
    configparser:     useKeyTab=true 
    configparser:     useTicketCache=false 
    configparser:     serviceName=zookeeper 
    Debug is true storeKey false useTicketCache false useKeyTab true doNotPrompt true ticketCache is null isInitiator true KeyTab is /home/dasda-sasd-adds-asdad/.asdad-asd-adds-adsasd.headless.keytab refreshKrb5Config is false principal is [email protected] tryFirstPass is false useFirstPass is false storePass is false clearPass is false 
    principal is [email protected] 
    Will use keytab 
      [LoginContext]: login success 
    Commit Succeeded 

      [LoginContext]: commit success 
    14:44:56.748 [main] WARN o.a.k.c.producer.ProducerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config. 
    adds in thread "main" org.apache.kafka.common.errors.Timeoutadds: Failed to update metadata after 60000 ms. 

請讓我知道如果ia我做錯了什麼。 謝謝, 馬亨德拉Tonape

回答

0

我們無法在我們的集羣消費從消費端的消息,但我們能夠在我們本地機器消耗的消息,這是因爲我們寫我們的應用程序API使用卡夫卡0.10的和我們的集羣有卡夫卡版本0.9。如果您檢查了這兩個Kafka版本之間的差異,您會發現這兩個版本API之間存在顯着差異。

此外,請啓用Kerberos調試日誌以檢查用戶是否使用啓用了Kerberos的羣集進行身份驗證

相關問題