2013-12-09 19 views

回答

1

如果您正在使用KafkaSpout確保以下幾點:

  1. 在你SpoutConfig「ID」和 重新部署拓撲結構的新版本後,「zkroot」不改變風暴使用「 zkroot」,「id」將主題偏移量存儲到zookeeper中
  2. KafkaConfig.forceFromStart設置爲false。

KafkaSpout將偏移量存儲到zookeeper中。如果您在KafkaSpout的KafkaConfig中將forceFromStart設置爲true(可能是您首次部署拓撲時的情況),那麼在重新部署期間要非常小心,它會忽略存儲的zookeeper偏移量。確保將其設置爲false。

考慮編寫拓撲結構,以便在拓撲的main()方法執行時從屬性文件中讀取KafkaConfig.forceFromStart值。這將允許您的管理員控制是否重播Kafka消息。

2

spoutConfig.forceStartOffsetTime(-1);

它會選擇最新的偏移周圍的時間戳寫入啓動消費。你可以用 強制噴口總是從最新的偏移量開始,通過傳入-1,你可以強制 它通過傳入-2來從最早的偏移量開始。

references

0

基本上事件序列將是:

  1. 第一次啓動拓撲通過從開始讀取與以下屬性:

    forceFromStart = true 
    
    startOffsetTime = -2 
    

上述道具將迫使它來啓動從主題開始。請記住擁有兩個屬性,因爲forceFromStart會讓風暴讀取startOffsetTime屬性,並使用設置的值來確定從何處開始讀取,並忽略zookeeper偏移量。

從現在起,您的拓撲將運行,zookeeper將保持偏移量。如果你的工作人員死亡,它將開始由主管開始,並開始讀取zookeeper中的偏移量。

  • 現在如果你想重新啓動您的拓撲結構,你想從那裏它被關閉之前離開的閱讀,使用下列財產,並重新啓動拓撲:

    forceFromStart = false 
    
  • 通過上述屬性,您告訴風暴不是讀取startOffsetTime值,而是使用在關閉拓撲之前維護的zookeeper偏移量。

    從現在起,每次重新啓動拓撲時,它都會從剩餘的地方讀取數據。

  • 如果你想重新啓動您的拓撲結構,你想從主題的頭部/頂部閱讀,使用下列財產,並重新啓動拓撲:

    forceFromStart = true 
    
    startOffsetTime = -1 
    
  • 通過上述屬性,您可以告訴風暴忽略動物園管理員偏移量,並從最新的偏移量開始,這是該主題的提示。