2017-10-08 46 views
0

我試圖從logstash使用IBM Message Hub。我logstash.conf:連接到IBM Message Hub的kafka輸入插件發生不可恢復的錯誤

input { 
    kafka { 
     bootstrap_servers => "kafka04-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka05-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka01-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka03-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka02-prod01.messagehub.services.eu-de.bluemix.net:9093" 
     security_protocol => "SASL_SSL" 
     sasl_mechanism => "PLAIN" 
     jaas_path => "kafka_jaas.conf" 
     ssl => true 
     topics => [ 
       "transactions_load" 
      ] 
    } 
} 
output { 
    stdout { } 
} 

基礎上Message Hub Consumer example,我錯過下面的卡夫卡配置屬性:

ssl.protocol=TLSv1.2 
ssl.enabled.protocols=TLSv1.2 
ssl.endpoint.identification.algorithm=HTTPS 

但是,我無法從logstash documentation看看如何設置這些。

我kafka_jaas.conf文件看起來像這樣:

KafkaClient { 
    org.apache.kafka.common.security.plain.PlainLoginModule required 
    serviceName="kafka" 
    username="****" 
    password="****"; 
    }; 

結果

[2017-10-08T10:00:48,325][ERROR][logstash.inputs.kafka ] Unable to create Kafka consumer from given configuration {:kafka_error_message=>java.lang.NullPointerException, :cause=>nil} 
.... 
[2017-10-08T10:00:52,717][ERROR][logstash.pipeline  ] A plugin had an unrecoverable error. Will restart this plugin. 
    Plugin: <LogStash::Inputs::Kafka bootstrap_servers=>"kafka04-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka05-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka01-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka03-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka02-prod01.messagehub.services.eu-de.bluemix.net:9093", 
security_protocol=>"SASL_SSL", 
sasl_mechanism=>"PLAIN", 
jaas_path=>"kafka_jaas.conf", 
ssl=>true, 
topics=>["transactions_load"], 
id=>"xxxxx", 
enable_metric=>true, 
codec=><LogStash::Codecs::Plain id=>"xxxxx", 
enable_metric=>true, 
charset=>"UTF-8">, 
auto_commit_interval_ms=>"5000", 
client_id=>"logstash", 
consumer_threads=>1, 
enable_auto_commit=>"true", 
group_id=>"logstash", 
key_deserializer_class=> 
    "org.apache.kafka.common.serialization.StringDeserializer", 
value_deserializer_class=> 
    "org.apache.kafka.common.serialization.StringDeserializer", 
poll_timeout_ms=>100, 
decorate_events=>false> 
    Error: uncaught throw in thread 0x301b0 

它可以簡單地複製我的問題,如果你有一個信息中心帳戶:

wget https://artifacts.elastic.co/downloads/logstash/logstash-5.4.2.tar.gz 
tar xvzf logstash-5.4.2.tar.gz 
# create logstash.conf from above 
# create kafka_jaas.conf from above 
./logstash-5.4.2/bin/logstash -f logstash.conf 

(也是同樣的問題與最新的logstash,目前5.6.2)

回答

1

對我來說,解決辦法是:

# replace old kafka consumer jar files 

rm logstash-5.4.2/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.7/vendor/jar-dependencies/runtime-jars/kafka-clients-0.10.0.1.jar 

cp ~/.m2/repository/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar logstash-5.4.2/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.7/vendor/jar-dependencies/runtime-jars/ 

然後編輯kafka.rb文件:

# logstash-5.4.2/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.7/lib/logstash/inputs/kafka.rb 

    private 
    def create_consumer(client_id) 
    begin 
     props = java.util.Properties.new 
     kafka = org.apache.kafka.clients.consumer.ConsumerConfig 

     props.put(kafka::AUTO_COMMIT_INTERVAL_MS_CONFIG, auto_commit_interval_ms) 
     props.put(kafka::AUTO_OFFSET_RESET_CONFIG, auto_offset_reset) unless auto_offset_reset.nil? 
     props.put(kafka::BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers) 
     props.put(kafka::CHECK_CRCS_CONFIG, check_crcs) unless check_crcs.nil? 
     props.put(kafka::CLIENT_ID_CONFIG, client_id) 
     props.put(kafka::CONNECTIONS_MAX_IDLE_MS_CONFIG, connections_max_idle_ms) unless connections_max_idle_ms.nil? 
     props.put(kafka::ENABLE_AUTO_COMMIT_CONFIG, enable_auto_commit) 
     props.put(kafka::EXCLUDE_INTERNAL_TOPICS_CONFIG, exclude_internal_topics) unless exclude_internal_topics.nil? 
     props.put(kafka::FETCH_MAX_WAIT_MS_CONFIG, fetch_max_wait_ms) unless fetch_max_wait_ms.nil? 
     props.put(kafka::FETCH_MIN_BYTES_CONFIG, fetch_min_bytes) unless fetch_min_bytes.nil? 
     props.put(kafka::GROUP_ID_CONFIG, group_id) 
     props.put(kafka::HEARTBEAT_INTERVAL_MS_CONFIG, heartbeat_interval_ms) unless heartbeat_interval_ms.nil? 
     props.put(kafka::KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer_class) 
     props.put(kafka::MAX_PARTITION_FETCH_BYTES_CONFIG, max_partition_fetch_bytes) unless max_partition_fetch_bytes.nil? 
     props.put(kafka::MAX_POLL_RECORDS_CONFIG, max_poll_records) unless max_poll_records.nil? 
     props.put(kafka::METADATA_MAX_AGE_MS_CONFIG, metadata_max_age_ms) unless metadata_max_age_ms.nil? 
     props.put(kafka::PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition_assignment_strategy) unless partition_assignment_strategy.nil? 
     props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes) unless receive_buffer_bytes.nil? 
     props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil? 
     props.put(kafka::REQUEST_TIMEOUT_MS_CONFIG, request_timeout_ms) unless request_timeout_ms.nil? 
     props.put(kafka::RETRY_BACKOFF_MS_CONFIG, retry_backoff_ms) unless retry_backoff_ms.nil? 
     props.put(kafka::SEND_BUFFER_CONFIG, send_buffer_bytes) unless send_buffer_bytes.nil? 
     props.put(kafka::SESSION_TIMEOUT_MS_CONFIG, session_timeout_ms) unless session_timeout_ms.nil? 
     props.put(kafka::VALUE_DESERIALIZER_CLASS_CONFIG, value_deserializer_class) 

     props.put("security.protocol", 'SASL_SSL') 
     props.put("sasl.mechanism", 'PLAIN') 
     props.put("ssl.protocol", "TLSv1.2") 
     props.put("ssl.enabled.protocols", "TLSv1.2") 
     props.put("ssl.endpoint.identification.algorithm", "HTTPS") 
     props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"****\" password=\"****\";") 

     # if security_protocol == "SSL" 
     # set_trustore_keystore_config(props) 
     # elsif security_protocol == "SASL_PLAINTEXT" 
     # set_sasl_config(props) 
     # elsif security_protocol == "SASL_SSL" 
     # set_trustore_keystore_config(props) 
     # set_sasl_config(props) 
     # end 

     org.apache.kafka.clients.consumer.KafkaConsumer.new(props) 
    rescue => e 
     logger.error("Unable to create Kafka consumer from given configuration", 
        :kafka_error_message => e, 
        :cause => e.respond_to?(:getCause) ? e.getCause() : nil) 
     throw e 
    end 
    end 

最後更新我的輸入配置:

input { 
    kafka { 
     bootstrap_servers => "kafka04-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka05-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka01-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka03-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka02-prod01.messagehub.services.eu-de.bluemix.net:9093" 
     topics => [ 
       "transactions_load" 
      ] 
    } 
} 
相關問題