2017-01-31 70 views
0

我正在使用Heroku Kafka,它運行0.10.1.1並使用SSL。他們只支持最新的協議。配置Kafka客戶端連接發出的SSL密鑰/ cert

Heroku Kafka使用SSL進行身份驗證和問題以及客戶端證書和密鑰,並提供CA證書。我分別放置在這些中client_cert.pemclient_key.pem,和trusted_cert.pem,跑建立密鑰庫的情況如下:

openssl pkcs12 -export -in client_cert.pem -inkey client_key.pem -certfile client_cert.pem -out client.p12 
keytool -importkeystore -srckeystore client.p12 -srcstoretype pkcs12 -destkeystore kafka.keystore.jks -deststoretype JKS 
keytool -keystore kafka.truststore.jks -alias CARoot -import -file trusted_cert.pem 

我然後創建client-ssl.properties含有下列:

ssl.protocol=SSL 
security.protocol=SSL 
ssl.truststore.location=kafka.truststore.jks 
ssl.truststore.type=JKS 
ssl.truststore.password=xxxx 
ssl.keystore.location=kafka.keystore.jks 
ssl.keystore.type=JKS 
ssl.keystore.password=xxxx 
ssl.key.password=xxxx 

我然後使用kafka-console-producer(版本0.10 .1.1)與以下內容:

kafka-console-producer --broker-list kafka+ssl://a.a.a.a:9096,kafka+ssl://b.b.b.b:9096,kafka+ssl://c.c.c.c:9096 --producer.config client-ssl.properties --topic robintest 

(該主題已被robintest創建。)

[2017-01-31 10:06:50,385] INFO ProducerConfig values: 
    acks = 1 
    batch.size = 16384 
    block.on.buffer.full = false 
    bootstrap.servers = [kafka+ssl://a.a.a.a:9096, kafka+ssl://b.b.b.b:9096, kafka+ssl://c.c.c.c:9096] 
    buffer.memory = 33554432 
    client.id = console-producer 
    compression.type = none 
    connections.max.idle.ms = 540000 
    interceptor.classes = null 
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 
    linger.ms = 1000 
    max.block.ms = 60000 
    max.in.flight.requests.per.connection = 5 
    max.request.size = 1048576 
    metadata.fetch.timeout.ms = 60000 
    metadata.max.age.ms = 300000 
    metric.reporters = [] 
    metrics.num.samples = 2 
    metrics.sample.window.ms = 30000 
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner 
    receive.buffer.bytes = 32768 
    reconnect.backoff.ms = 50 
    request.timeout.ms = 1500 
    retries = 3 
    retry.backoff.ms = 100 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    sasl.kerberos.min.time.before.relogin = 60000 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    sasl.mechanism = GSSAPI 
    security.protocol = SSL 
    send.buffer.bytes = 102400 
    ssl.cipher.suites = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    ssl.endpoint.identification.algorithm = null 
    ssl.key.password = [hidden] 
    ssl.keymanager.algorithm = SunX509 
    ssl.keystore.location = kafka.keystore.jks 
    ssl.keystore.password = [hidden] 
    ssl.keystore.type = JKS 
    ssl.protocol = SSL 
    ssl.provider = null 
    ssl.secure.random.implementation = null 
    ssl.trustmanager.algorithm = PKIX 
    ssl.truststore.location = kafka.truststore.jks 
    ssl.truststore.password = [hidden] 
    ssl.truststore.type = JKS 
    timeout.ms = 30000 
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 
(org.apache.kafka.clients.producer.ProducerConfig) 
[2017-01-31 10:06:50,390] INFO ProducerConfig values: 
    acks = 1 
    batch.size = 16384 
    block.on.buffer.full = false 
    bootstrap.servers = [kafka+ssl://a.a.a.a:9096, kafka+ssl://b.b.b.b:9096, kafka+ssl://c.c.c.c:9096] 
    buffer.memory = 33554432 
    client.id = console-producer 
    compression.type = none 
    connections.max.idle.ms = 540000 
    interceptor.classes = null 
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 
    linger.ms = 1000 
    max.block.ms = 60000 
    max.in.flight.requests.per.connection = 5 
    max.request.size = 1048576 
    metadata.fetch.timeout.ms = 60000 
    metadata.max.age.ms = 300000 
    metric.reporters = [] 
    metrics.num.samples = 2 
    metrics.sample.window.ms = 30000 
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner 
    receive.buffer.bytes = 32768 
    reconnect.backoff.ms = 50 
    request.timeout.ms = 1500 
    retries = 3 
    retry.backoff.ms = 100 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    sasl.kerberos.min.time.before.relogin = 60000 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    sasl.mechanism = GSSAPI 
    security.protocol = SSL 
    send.buffer.bytes = 102400 
    ssl.cipher.suites = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    ssl.endpoint.identification.algorithm = null 
    ssl.key.password = [hidden] 
    ssl.keymanager.algorithm = SunX509 
    ssl.keystore.location = kafka.keystore.jks 
    ssl.keystore.password = [hidden] 
    ssl.keystore.type = JKS 
    ssl.protocol = SSL 
    ssl.provider = null 
    ssl.secure.random.implementation = null 
    ssl.trustmanager.algorithm = PKIX 
    ssl.truststore.location = kafka.truststore.jks 
    ssl.truststore.password = [hidden] 
    ssl.truststore.type = JKS 
    timeout.ms = 30000 
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 
(org.apache.kafka.clients.producer.ProducerConfig) 
[2017-01-31 10:06:50,396] DEBUG Added sensor with name bufferpool-wait-time (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,398] DEBUG Added sensor with name buffer-exhausted-records (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,399] DEBUG Updated cluster metadata version 1 to Cluster(id = null, nodes = [b.b.b.b:9096 (id: -2 rack: null), c.c.c.c:9096 (id: -3 rack: null), a.a.a.a:9096 (id: -1 rack: null)], partitions = []) (org.apache.kafka.clients.Metadata) 
[2017-01-31 10:06:50,457] DEBUG Added sensor with name connections-closed: (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,457] DEBUG Added sensor with name connections-created: (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,457] DEBUG Added sensor with name bytes-sent-received: (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,457] DEBUG Added sensor with name bytes-sent: (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,458] DEBUG Added sensor with name bytes-received: (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,458] DEBUG Added sensor with name select-time: (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,459] DEBUG Added sensor with name io-time: (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,462] DEBUG Added sensor with name batch-size (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,462] DEBUG Added sensor with name compression-rate (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,462] DEBUG Added sensor with name queue-time (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,462] DEBUG Added sensor with name request-time (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,464] DEBUG Added sensor with name produce-throttle-time (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,465] DEBUG Added sensor with name records-per-request (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,465] DEBUG Added sensor with name record-retries (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,465] DEBUG Added sensor with name errors (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,465] DEBUG Added sensor with name record-size-max (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:50,467] DEBUG Starting Kafka producer I/O thread. (org.apache.kafka.clients.producer.internals.Sender) 
[2017-01-31 10:06:50,468] INFO Kafka version : 0.10.1.1 (org.apache.kafka.common.utils.AppInfoParser) 
[2017-01-31 10:06:50,468] INFO Kafka commitId : f10ef2720b03b247 (org.apache.kafka.common.utils.AppInfoParser) 
[2017-01-31 10:06:50,468] DEBUG Kafka producer started (org.apache.kafka.clients.producer.KafkaProducer) 

在這一點上,我發送一個記錄,然後按回車。

[2017-01-31 10:06:53,194] DEBUG Initialize connection to node -2 for sending metadata request (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:53,194] DEBUG Initiating connection to node -2 at b.b.b.b:9096. (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:53,457] DEBUG Added sensor with name node--2.bytes-sent (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:53,457] DEBUG Added sensor with name node--2.bytes-received (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:53,458] DEBUG Added sensor with name node--2.latency (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:53,460] DEBUG Created socket with SO_RCVBUF = 33304, SO_SNDBUF = 102808, SO_TIMEOUT = 0 to node -2 (org.apache.kafka.common.network.Selector) 
[2017-01-31 10:06:53,463] DEBUG Completed connection to node -2 (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:53,692] DEBUG Sending metadata request {topics=[robintest]} to node -2 (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:53,724] DEBUG Connection with ec2-34-194-25-39.compute-1.amazonaws.com/b.b.b.b disconnected (org.apache.kafka.common.network.Selector) 
java.io.EOFException 
    at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:488) 
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) 
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) 
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154) 
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135) 
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343) 
    at org.apache.kafka.common.network.Selector.poll(Selector.java:291) 
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) 
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236) 
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135) 
    at java.lang.Thread.run(Thread.java:745) 
[2017-01-31 10:06:53,728] DEBUG Node -2 disconnected. (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:53,728] WARN Bootstrap broker b.b.b.b:9096 disconnected (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:53,729] DEBUG Initialize connection to node -1 for sending metadata request (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:53,729] DEBUG Initiating connection to node -1 at a.a.a.a:9096. (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:53,791] DEBUG Added sensor with name node--1.bytes-sent (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:53,792] DEBUG Added sensor with name node--1.bytes-received (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:53,792] DEBUG Added sensor with name node--1.latency (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:53,792] DEBUG Created socket with SO_RCVBUF = 33304, SO_SNDBUF = 102808, SO_TIMEOUT = 0 to node -1 (org.apache.kafka.common.network.Selector) 
[2017-01-31 10:06:53,792] DEBUG Completed connection to node -1 (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:53,994] DEBUG Sending metadata request {topics=[robintest]} to node -1 (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:54,025] DEBUG Connection with ec2-34-194-39-35.compute-1.amazonaws.com/a.a.a.a disconnected (org.apache.kafka.common.network.Selector) 
java.io.EOFException 
    at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:488) 
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) 
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) 
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154) 
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135) 
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343) 
    at org.apache.kafka.common.network.Selector.poll(Selector.java:291) 
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) 
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236) 
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135) 
    at java.lang.Thread.run(Thread.java:745) 
[2017-01-31 10:06:54,026] DEBUG Node -1 disconnected. (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:54,026] WARN Bootstrap broker a.a.a.a:9096 disconnected (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:54,027] DEBUG Initialize connection to node -3 for sending metadata request (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:54,027] DEBUG Initiating connection to node -3 at c.c.c.c:9096. (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:54,102] DEBUG Added sensor with name node--3.bytes-sent (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:54,103] DEBUG Added sensor with name node--3.bytes-received (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:54,103] DEBUG Added sensor with name node--3.latency (org.apache.kafka.common.metrics.Metrics) 
[2017-01-31 10:06:54,104] DEBUG Created socket with SO_RCVBUF = 33304, SO_SNDBUF = 102808, SO_TIMEOUT = 0 to node -3 (org.apache.kafka.common.network.Selector) 
[2017-01-31 10:06:54,104] DEBUG Completed connection to node -3 (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:54,309] DEBUG Sending metadata request {topics=[robintest]} to node -3 (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:54,342] DEBUG Connection with ec2-34-194-45-119.compute-1.amazonaws.com/c.c.c.c disconnected (org.apache.kafka.common.network.Selector) 
java.io.EOFException 
    at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:488) 
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) 
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) 
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154) 
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135) 
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343) 
    at org.apache.kafka.common.network.Selector.poll(Selector.java:291) 
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) 
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236) 
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135) 
    at java.lang.Thread.run(Thread.java:745) 
[2017-01-31 10:06:54,342] DEBUG Node -3 disconnected. (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:54,343] WARN Bootstrap broker c.c.c.c:9096 disconnected (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:54,343] DEBUG Initialize connection to node -1 for sending metadata request (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:54,343] DEBUG Initiating connection to node -1 at a.a.a.a:9096. (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:54,348] DEBUG Initialize connection to node -2 for sending metadata request (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:54,348] DEBUG Initiating connection to node -2 at b.b.b.b:9096. (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:54,376] DEBUG Created socket with SO_RCVBUF = 33304, SO_SNDBUF = 102808, SO_TIMEOUT = 0 to node -2 (org.apache.kafka.common.network.Selector) 
[2017-01-31 10:06:54,377] DEBUG Completed connection to node -2 (org.apache.kafka.clients.NetworkClient) 
[2017-01-31 10:06:54,379] DEBUG Created socket with SO_RCVBUF = 33304, SO_SNDBUF = 102808, SO_TIMEOUT = 0 to node -1 (org.apache.kafka.common.network.Selector) 
[2017-01-31 10:06:54,379] DEBUG Completed connection to node -1 (org.apache.kafka.clients.NetworkClient) 

這些條目會一直進行下去,直到我終止進程。

我已經嘗試了所有配置組合,包括在producer.前綴屬性文件中的所有配置,刪除整個配置(這似乎沒有區別),將密碼設置爲不正確的值(這似乎使不區別)。我也嘗試用他們的憑證連接到不同的提供商(www.cloudkarafka.com),我也得到了相同的結果。所以它絕對看起來像一個配置問題。

回答

1

事實證明,我的Kafka集羣(Heroku附加組件)實際上並未運行0.10.1.1,它運行的是0.10.0.1。這兩個似乎有不兼容的消費者API。 (我不得不說,「這就是爲什麼語義版本存在。」)


upgrade Kafka running on Heroku,用途:heroku kafka:upgrade --version 0.10其升級到最新0.10.X釋放。所以如果你使用0.9而你想要0.10.0.1,祝你好運。

相關問題