0
我試圖從logstash使用IBM Message Hub。我logstash.conf:連接到IBM Message Hub的kafka輸入插件發生不可恢復的錯誤
input {
kafka {
bootstrap_servers => "kafka04-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka05-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka01-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka03-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka02-prod01.messagehub.services.eu-de.bluemix.net:9093"
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
jaas_path => "kafka_jaas.conf"
ssl => true
topics => [
"transactions_load"
]
}
}
output {
stdout { }
}
基礎上Message Hub Consumer example,我錯過下面的卡夫卡配置屬性:
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
但是,我無法從logstash documentation看看如何設置這些。
我kafka_jaas.conf文件看起來像這樣:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="kafka"
username="****"
password="****";
};
結果
[2017-10-08T10:00:48,325][ERROR][logstash.inputs.kafka ] Unable to create Kafka consumer from given configuration {:kafka_error_message=>java.lang.NullPointerException, :cause=>nil}
....
[2017-10-08T10:00:52,717][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin.
Plugin: <LogStash::Inputs::Kafka bootstrap_servers=>"kafka04-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka05-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka01-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka03-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka02-prod01.messagehub.services.eu-de.bluemix.net:9093",
security_protocol=>"SASL_SSL",
sasl_mechanism=>"PLAIN",
jaas_path=>"kafka_jaas.conf",
ssl=>true,
topics=>["transactions_load"],
id=>"xxxxx",
enable_metric=>true,
codec=><LogStash::Codecs::Plain id=>"xxxxx",
enable_metric=>true,
charset=>"UTF-8">,
auto_commit_interval_ms=>"5000",
client_id=>"logstash",
consumer_threads=>1,
enable_auto_commit=>"true",
group_id=>"logstash",
key_deserializer_class=>
"org.apache.kafka.common.serialization.StringDeserializer",
value_deserializer_class=>
"org.apache.kafka.common.serialization.StringDeserializer",
poll_timeout_ms=>100,
decorate_events=>false>
Error: uncaught throw in thread 0x301b0
它可以簡單地複製我的問題,如果你有一個信息中心帳戶:
wget https://artifacts.elastic.co/downloads/logstash/logstash-5.4.2.tar.gz
tar xvzf logstash-5.4.2.tar.gz
# create logstash.conf from above
# create kafka_jaas.conf from above
./logstash-5.4.2/bin/logstash -f logstash.conf
(也是同樣的問題與最新的logstash,目前5.6.2)