1
我用下面的代碼C++
產生消息Kafka
:C++與卡夫卡 - 消費者只得到一些製片人的消息
#include <thread>
#include <cppkafka/producer.h>
using namespace cppkafka;
int main()
{
for(int i = 0 ; i < 100 ; i++)
{
std::cout << "sending msg number: " << i << std::endl;
std::string str("{'msg number' : " + std::to_string(i) + "}");
// Create a message builder for this topic
MessageBuilder builder("test");
// Construct the configuration
Configuration config =
{
{ "metadata.broker.list", "192.168.1.100:9092"}
};
// Create the producer
Producer producer(config);
builder.payload(str);
producer.produce(builder); //Only a few messages are received!
std::this_thread::sleep_for(std::chrono::milliseconds(50));//If I remove this, no message is received!
}
}
在我的機器,我已經運行Zookeeper
和Kafka server
,我跑一個consumer
,以顯示接收的消息,通過使用這樣的:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092 --topic test
我C++
代碼產生以下:
sending msg number: 0
sending msg number: 1
sending msg number: 2
sending msg number: 3
(...) //from 0 to 99...all the messages are sent!
sending msg number: 98
sending msg number: 99
我期待消費者,接收所有這些消息,但我看到的只是幾個:
{'msg number' : 40}
{'msg number' : 58}
{'msg number' : 70}
{'msg number' : 75}
{'msg number' : 91}
{'msg number' : 96}
並沒有更多的好評。
如果我刪除行:
std::this_thread::sleep_for(std::chrono::milliseconds(50));
我沒有收到任何消息。 爲什麼我的Kafka
服務器沒有收到我的所有消息?
難道是某種形式的垃圾郵件防護,即從過快發送消息和服務器只是開溝他們>? – GPPK
我想到了這一點,這就是爲什麼我放置了睡眠(50毫秒)。無論哪種方式,如果我使用這個,我仍然沒有收到所有的消息..卡夫卡是否應該能夠接收一切,並排隊他們? – waas1919
嗨@GPPK你可以把你的評論作爲答案?你是對的,這就是這個問題的答案。 – waas1919