我試圖連接到IBM Bluemix消息轂和用java產生的消息之後的示例卡夫卡生產者超時問題連接到消息轂上Bluemix
https://github.com/ibm-messaging/message-hub-samples/tree/master/java/message-hub-kafka-ssl
producer.properties
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
acks=-1
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.truststore.password=changeit
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=HTTPS
ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/security/cacerts
jaas.conf.template
KafkaClient {
com.ibm.messagehub.login.MessageHubLoginModule required
serviceName="kafka"
user="$USERNAME"
password="$PASSWORD";
};
Producer.java片斷
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(
"MyTopic",
KEY.getBytes("UTF-8"),
"MESSAGE".getBytes("UTF-8"));
// Synchronously wait for a response from Message Hub/Kafka.
RecordMetadata m = kafkaProducer.send(record).get();
的問題是,我得到當我試圖讓未來RecordMetadata
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:730)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:483)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353)
閱讀先前的帖子在同一個超時異常話題
Timeout connecting to message-hub on Bluemix
,並提到了可能的原因是該主題不是created.I可以看到bluemix控制檯的話題和驗證,我稱之爲REST服務推送消息
RESTRequest restApi = new RESTRequest(getRestHost(),getApiKey());
String topics = restApi.get("/admin/topics", false);
logger.info("Topics present in the system: " + topics);
它返回之前獲取的主題列表我嘗試推送消息的主題,但我收到超時錯誤。
能有人幫我調試的問題
UPDATE
根據意見,我啓用了調試日誌,卡夫卡,這裏是日誌序列
2016-11-23 08:48:20.906 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.clients.NetworkClient : Initialize connection to node -5 for sending metadata request
2016-11-23 08:48:20.906 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.clients.NetworkClient : Initiating connection to node -5 at kafka05-prod01.messagehub.services.us-south.bluemix.net:9093.
2016-11-23 08:48:20.914 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator : Set SASL client state to SEND_HANDSHAKE_REQUEST
2016-11-23 08:48:20.915 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator : Creating SaslClient: client=messagehub/[email protected];service=kafka;serviceHostname=kafka05-prod01.messagehub.services.us-south.bluemix.net;mechs=[PLAIN]
2016-11-23 08:48:20.979 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--5.bytes-sent
2016-11-23 08:48:20.980 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--5.bytes-received
2016-11-23 08:48:20.980 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--5.latency
2016-11-23 08:48:20.982 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.clients.NetworkClient : Completed connection to node -5
2016-11-23 08:48:21.080 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator : Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
2016-11-23 08:48:21.264 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator : Set SASL client state to INITIAL
2016-11-23 08:48:21.265 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator : Set SASL client state to INTERMEDIATE
2016-11-23 08:48:21.284 DEBUG 72885 --- [sage-hub-sample] o.apache.kafka.common.network.Selector : Connection with kafka05-prod01.messagehub.services.us-south.bluemix.net/23.246.202.55 disconnected
java.io.EOFException: null
at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:488)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:239)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:182)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:64)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:318)
at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
at java.lang.Thread.run(Thread.java:745)
所有以上記錄爲5個經紀人中的任何一個。這些都是調試語句,所以我不確定它們是否是錯誤。
-Tatha
謝謝@Mickael,那些修改問題的更改 – Tatha