2016-11-21 178 views
0

我正在嘗試將文件中的數據寫入卡夫卡主題。我的代碼如下所示:卡夫卡生產者跳過消息

Properties properties = new Properties(); 
    properties.put("bootstrap.servers", <bootstrapServers>); 
    properties.put("key.serializer", StringSerializer.class.getCanonicalName()); 
    properties.put("value.serializer", StringSerializer.class.getCanonicalName()); 
    properties.put("retries",100); 
    properties.put("linger.ms",5); 
    properties.put("acks", "all"); 

    KafkaProducer<Object, String> producer = new KafkaProducer<>(properties); 

    try (BufferedReader bf = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), "UTF-8"))) { 
     String line; 
     int count = 0; 
     while ((line = bf.readLine()) != null) { 
      count++; 
      producer.send(new ProducerRecord<>(topicName, line)); 
     } 
    producer.flush(); 
     Logger.log("Done producing data messages. Total no of records produced:" + count); 
    } catch (InterruptedException | ExecutionException | IOException e) { 
     Throwables.propagate(e); 
    } finally { 
     producer.close(); 
    } 

數據大小高於100萬條記錄。

當我檢查上使用下面的命令代理偏移數據的,只有消息的一半(約5,00,000)被寫入在話題:

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker_list> --time -1 --topic <topic_name> 

上述命令的輸出:

topic_name:1:292954 
topic_name:0:296787 

我應該怎麼做才能確保所有內容都寫在主題上。

+0

你能顯示GetOffsetShell命令的實際輸出嗎? – C4stor

+0

在問題中添加了輸出。 –

+0

應用程序日誌文件中的計數值是多少?它顯示1米嗎? – notionquest

回答

0

發送消息是異步的。在處理所有消息之前,您可能正在檢查偏移量。

+0

日誌保留時間爲24小時。而且我正在檢查消息後產生消息,這幾乎需要4-5分鐘。 –

+0

你完全改變了答案。 –

+0

是的,我的想法很糟糕,意識到保留不會成爲問題,我在更改之前忘記刷新 –

相關問題