2016-03-11 94 views
1

發送錯誤,同時發送消息給kerberosed環境中的卡夫卡主題。我們有集羣HDP 2.3卡夫卡爪哇生產者與kerberos

我跟着這個http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/

但用於發送郵件,我必須做明確的kinit第一,那麼只有我能夠將消息發送到卡夫卡的話題。 我試圖通過java類來編織,但那也行不通。 PFB代碼:

package com.ct.test.kafka; 

import java.util.Date; 
import java.util.Properties; 
import java.util.Random; 

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

public class TestProducer { 

    public static void main(String[] args) { 

     String principalName = "ctadmin"; 
     String keyTabPath = "/etc/security/keytabs/ctadmin.keytab"; 
     boolean authStatus = CTSecurityUtil.loginUserFromKeytab(principalName, keyTabPath); 

     if (!authStatus) { 
      System.out.println("Authntication fails, try something else " + authStatus); 
     } else { 
      System.out.println("Authntication successfull " + authStatus); 
     } 

     System.setProperty("java.security.krb5.conf", "/etc/krb5.conf"); 
     System.setProperty("java.security.auth.login.config", "/etc/kafka/2.3.4.0-3485/0/kafka_jaas.conf"); 
     System.setProperty("javax.security.auth.useSubjectCredsOnly", "false"); 
     System.setProperty("sun.security.krb5.debug", "true"); 

     try { 
      long events = Long.parseLong("3"); 
      Random rnd = new Random(); 

      Properties props = new Properties(); 
      System.out.println("After broker list- " + args[0]); 

      props.put("metadata.broker.list", args[0]); 
      props.put("serializer.class", "kafka.serializer.StringEncoder"); 
      props.put("request.required.acks", "1"); 
      props.put("security.protocol", "PLAINTEXTSASL"); 

      //props.put("partitioner.class", "com.ct.test.kafka.SimplePartitioner"); 


      System.out.println("After config prop -1"); 

      ProducerConfig config = new ProducerConfig(props); 

      System.out.println("After config prop -2 config" + config); 

      Producer<String, String> producer = new Producer<String, String>(config); 

      System.out.println("After config prop -3"); 

      for (long nEvents = 0L; nEvents < events; nEvents += 1L) { 
       Date runtime = new Date(); 
       String ip = "192.168.2" + rnd.nextInt(255); 
       String msg = runtime + " www.example.com, " + ip; 
       KeyedMessage<String, String> data = new KeyedMessage<String, String>("test_march4", ip, msg); 

       System.out.println("After config prop -1 data" + data); 

       producer.send(data); 
      } 
      producer.close(); 

     } catch (Throwable th) { 
      th.printStackTrace(); 

     } 
    } 
} 

pom.xml的:所有的依賴性和hortonworks回購下載。

 <dependencies> 
      <dependency> 
       <groupId>org.apache.kafka</groupId> 
       <artifactId>kafka_2.10</artifactId> 
       <version>0.9.0.2.3.4.0-3485</version> 
      </dependency> 

      <dependency> 
       <groupId>org.apache.kafka</groupId> 
       <artifactId>kafka-clients</artifactId> 
       <version>0.9.0.2.3.4.0-3485</version> 
      </dependency> 

      <dependency> 
       <groupId>org.jasypt</groupId> 
       <artifactId>jasypt-spring31</artifactId> 
       <version>1.9.2</version> 
       <scope>compile</scope> 
      </dependency> 

      <dependency> 
       <groupId>org.apache.hadoop</groupId> 
       <artifactId>hadoop-common</artifactId> 
       <version>2.7.1.2.3.4.0-3485</version> 
      </dependency> 

     </dependencies> 

錯誤: 案例1:當我指定爲myuser kafka_jass.conf

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). 
log4j:WARN Please initialize the log4j system properly. 
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 
After config prop -2 [email protected] 
java.lang.SecurityException: Configuration Error: 
     Line 6: expected [controlFlag] 
     at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:110) 
     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
     at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) 
     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
     at java.lang.reflect.Constructor.newInstance(Constructor.java:526) 
     at java.lang.Class.newInstance(Class.java:379) 
     at javax.security.auth.login.Configuration$2.run(Configuration.java:258) 
     at javax.security.auth.login.Configuration$2.run(Configuration.java:250) 
     at java.security.AccessController.doPrivileged(Native Method) 
     at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:249) 
     at org.apache.kafka.common.security.kerberos.Login.login(Login.java:291) 
     at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104) 
     at kafka.common.security.LoginManager$.init(LoginManager.scala:36) 
     at kafka.producer.Producer.<init>(Producer.scala:50) 
     at kafka.producer.Producer.<init>(Producer.scala:73) 
     at kafka.javaapi.producer.Producer.<init>(Producer.scala:26) 
     at com.ct.test.kafka.TestProducer.main(TestProducer.java:51) 
Caused by: java.io.IOException: Configuration Error: 
     Line 6: expected [controlFlag] 
     at com.sun.security.auth.login.ConfigFile.match(ConfigFile.java:563) 
     at com.sun.security.auth.login.ConfigFile.parseLoginEntry(ConfigFile.java:413) 
     at com.sun.security.auth.login.ConfigFile.readConfig(ConfigFile.java:383) 
     at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:283) 
     at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:219) 
     at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:108) 

MyUser_Kafka_jass.conf

KafkaClient { 
    com.sun.security.auth.module.Krb5LoginModule required 
    doNotPrompt=true 
    useTicketCache=true 
    renewTicket=true 
    principal="ctadmin/[email protected]"; 
    useKeyTab=true 
    serviceName="kafka" 
    keyTab="/etc/security/keytabs/ctadmin.keytab" 
    client=true; 
}; 
Client { 
    com.sun.security.auth.module.Krb5LoginModule required 
    useKeyTab=true 
    keyTab="/etc/security/keytabs/ctadmin.keytab" 
    storeKey=true 
    useTicketCache=true 
    serviceName="zookeeper" 
    principal="ctadmin/[email protected]"; 
}; 

案例2:當我指定Kafkas自己的JAAS文件

Java config name: /etc/krb5.conf 
Loaded from Java config 
javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. Make sure -Djava.security.auth.login.config property passed to JVM and the client is configured to use a ticket cache (using the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using FQDN of the Kafka broker you are trying to connect to. not available to garner authentication information from the user 
     at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:899) 
     at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719) 
     at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762) 
     at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203) 
     at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690) 
     at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688) 
     at java.security.AccessController.doPrivileged(Native Method) 
     at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687) 
     at javax.security.auth.login.LoginContext.login(LoginContext.java:595) 
     at org.apache.kafka.common.security.kerberos.Login.login(Login.java:298) 
     at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104) 
     at kafka.common.security.LoginManager$.init(LoginManager.scala:36) 
     at kafka.producer.Producer.<init>(Producer.scala:50) 
     at kafka.producer.Producer.<init>(Producer.scala:73) 
     at kafka.javaapi.producer.Producer.<init>(Producer.scala:26) 
     at com.ct.test.kafka.TestProducer.main(TestProducer.java:51) 

這工作正常,如果我在運行這個應用程序之前做kinit,否則它會通過上述錯誤。 我無法在我的製作環境中執行此操作,如果有任何方法可以由我們的應用程序本身執行此操作,請幫助我。 如果您需要更多詳情,請讓我知道。

謝謝:)

回答

1

我不知道是什麼錯誤做第一次,下面的事情我又做到了,它工作正常。

首先給所有進入主題:

bin/kafka-acls.sh --add --allow-principals user:ctadmin --operation ALL --topic marchTesting --authorizer-properties zookeeper.connect={hostname}:2181 

創建JASS文件: 卡夫卡的Jaas.conf

KafkaClient { 
com.sun.security.auth.module.Krb5LoginModule required 
doNotPrompt=true 
useTicketCache=true 
principal="[email protected]" 
useKeyTab=true 
serviceName="kafka" 
keyTab="/etc/security/keytabs/ctadmin.keytab" 
client=true; 
}; 

Java程序:

package com.ct.test.kafka; 

import java.util.Date; 
import java.util.Properties; 

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

public class KafkaProducer { 

    public static void main(String[] args) { 
     String topic = args[0]; 

     Properties props = new Properties(); 
     props.put("metadata.broker.list", "{Hostname}:6667"); 
     props.put("serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("request.required.acks", "1"); 
     props.put("security.protocol", "PLAINTEXTSASL"); 

     ProducerConfig config = new ProducerConfig(props); 
     Producer<String, String> producer = new Producer<String, String>(config); 

     for (int i = 0; i < 10; i++){ 
      producer.send(new KeyedMessage<String, String>(topic, "Test Date: " + new Date())); 
     } 
    } 
} 

運行應用程序:

java -Djava.security.auth.login.config =/home/ctadmin/kafka-jaas.conf -Djava.security.krb5.conf = /etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly = true -cp kafka-testing-0.0.1-jar -with-dependencies.jar com.ct.test.kafka.KafkaProducer

+0

異常 'ERROR Utils $:106 - 從代理[ArrayBuffer(id:0,host:D-9539.mydomain)獲取主題[Set(test3)]的主題元數據。kafka.common.KafkaException:從broker [ArrayBuffer(id:0,host:D-9539.mydomain.com,port:6667)]獲取主題[Set(test3)]的主題元數據]失敗 \t在kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:72) \t在kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)在kafka.producer.async.DefaultEventHandler $$ anonfun $ \t處理$ 1.apply $ mcV $ sp(DefaultEventHandler.scala:67)' –

+0

我正在努力編寫一個Java生產者客戶端來連接與Kerberos啓用的Kafka生產者。我得到KrbException:標識符不符合期望值(906)錯誤。 – user2435082

1

錯誤是在你的jaas文件中有一個分號,你可以在這段輸出中看到:

Line 6: expected [controlFlag] 

這條線不能有分號:

principal="ctadmin/[email protected]"; 

它只能在最後一行存在:

相關問題