2017-10-12 126 views
1

我用下面的代碼C++產生消息KafkaC++與卡夫卡 - 消費者只得到一些製片人的消息

#include <thread> 
#include <cppkafka/producer.h> 

using namespace cppkafka; 

int main() 
{ 
    for(int i = 0 ; i < 100 ; i++) 
    { 
     std::cout << "sending msg number: " << i << std::endl; 
     std::string str("{'msg number' : " + std::to_string(i) + "}"); 

     // Create a message builder for this topic 
     MessageBuilder builder("test"); 

     // Construct the configuration 
     Configuration config = 
     { 
      { "metadata.broker.list", "192.168.1.100:9092"} 
     }; 

     // Create the producer 
     Producer producer(config); 

     builder.payload(str); 

     producer.produce(builder); //Only a few messages are received! 

     std::this_thread::sleep_for(std::chrono::milliseconds(50));//If I remove this, no message is received! 
    } 
} 

在我的機器,我已經運行ZookeeperKafka server,我跑一個consumer,以顯示接收的消息,通過使用這樣的:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092 --topic test 

C++代碼產生以下:

sending msg number: 0 
sending msg number: 1 
sending msg number: 2 
sending msg number: 3 
(...) //from 0 to 99...all the messages are sent! 
sending msg number: 98 
sending msg number: 99 

我期待消費者,接收所有這些消息,但我看到的只是幾個:

{'msg number' : 40} 
{'msg number' : 58} 
{'msg number' : 70} 
{'msg number' : 75} 
{'msg number' : 91} 
{'msg number' : 96} 

並沒有更多的好評。

如果我刪除行:

std::this_thread::sleep_for(std::chrono::milliseconds(50)); 

我沒有收到任何消息。 爲什麼我的Kafka服務器沒有收到我的所有消息?

+1

難道是某種形式的垃圾郵件防護,即從過快發送消息和服務器只是開溝他們>? – GPPK

+0

我想到了這一點,這就是爲什麼我放置了睡眠(50毫秒)。無論哪種方式,如果我使用這個,我仍然沒有收到所有的消息..卡夫卡是否應該能夠接收一切,並排隊他們? – waas1919

+1

嗨@GPPK你可以把你的評論作爲答案?你是對的,這就是這個問題的答案。 – waas1919

回答

1

原文評論:

這可能是有點垃圾郵件防禦,即從過快發送消息和服務器只是拋棄他們

有很多不同的反垃圾郵件技術開發由網絡服務提供商提供這是通過發送大量網絡交易來阻止某人發送垃圾郵件。我覺得可能是你在做什麼

https://en.wikipedia.org/wiki/Anti-spam_techniques