2013-07-23 26 views
2

至於簡單的消費提到這裏simpleConsumer模塊是否有任何解決方法來只讀取新消息?

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

另外請注意,我們明確地檢查偏移讀取 比失調,我們要求的不是更少。這是必要的,因爲如果 Kafka正在壓縮消息,則即使所請求的偏移不是壓縮塊的起始點 ,取回請求也將返回整個壓縮塊。因此,我們之前看到的消息 可能會再次返回。

最後,我們跟蹤讀取的消息數量。如果我們在最後一次請求中沒有閱讀任何內容,我們會睡一會兒,所以當沒有數據時,我們不會敲打卡夫卡。

就像在我的程序中一樣,它先讀取一條舊信息,隨着它變老而進入睡眠狀態,然後讀取新記錄。

任何解決方法,以便SimpleConsumer只讀取新消息?

回答

0

在同一頁面

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, 
           long whichTime, String clientName) { 
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); 
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); 
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); 
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName); 
    OffsetResponse response = consumer.getOffsetsBefore(request); 

    if (response.hasError()) { 
     System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); 
     return 0; 
    } 
    long[] offsets = response.offsets(topic, partition); 
    return offsets[0]; 
} 

它說,尋找偏移讀取

卡夫卡包括兩個常量的幫助,kafka.api.OffsetRequest.EarliestTime()找到數據的開始在日誌中並從那裏開始流式傳輸,kafka.api.OffsetRequest.LatestTime()將只傳輸新消息。不要認爲偏移量0是開始偏移量,因爲消息隨時間超出日誌。

+0

答案有點不相關,因爲OP描述的情況與0無關,因爲它是起始偏移量 - 它是壓縮消息的怪癖。 –

相關問題