爲了避免讀取KAFKA STREAMS被殺死時處理但未提交的消息,我希望獲得每個消息的偏移以及鍵和值,以便我可以將它存儲在某處並將其用於避免重新處理已處理的消息。有沒有一種方法可以獲得在kafka流中消費的每條消息的偏移量?
3
A
回答
2
是的,這是可能的。請參閱http://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information的FAQ條目。
我將在下面的關鍵信息複製粘貼:
訪問記錄元數據,如主題,分區和偏移信息?
記錄元數據可通過Processor API訪問。 它也可以通過DSL間接獲得,這要感謝它的 Processor API integration。
使用Processor API,您可以通過
ProcessorContext
訪問記錄元數據。您可以在Processor#init()
存儲在你的處理器的 實例字段的上下文的引用,然後 查詢中Processor#process()
處理器方面,例如 (同爲Transformer
)。上下文自動更新以匹配 當前正在處理的記錄,這意味着方法 (如ProcessorContext#partition()
)總是返回當前的 記錄的元數據。在punctuate()
內調用處理器 上下文時,有些注意事項適用,請參閱Javadocs以瞭解詳細信息。如果使用DSL與自定義
Transformer
組合,例如, 你可以改變輸入記錄的值也包括分區 和偏移元數據,以及諸如map
或 隨後的DSL業務filter
可以再利用這些信息。
相關問題
- 1. Golang Kafka沒有消耗所有消息偏移西南
- 2. 有沒有辦法阻止特定偏移量的Kafka消費者?
- 3. 使用新消費者API刪除kafka消費者偏移量
- 4. 卡夫卡消費者中的控制消息偏移量
- 5. 如何獲得Kafka 0.10.x中沒有外殼的電流偏移量?
- 6. 有沒有辦法獲得dailymotion實時流的聊天消息?
- 7. 有沒有一種方法可以獲取SSIS可以提出的所有異常消息的列表
- 8. 只消費Kafka Queue的第一條消息?
- 9. Apache Kafka消費者組的偏移量如何到期?
- 10. 如何獲取Golang Kafka中的分區的消費者組偏移量10
- 11. 有沒有一個很好的例子,用於消費通用json消息的Spring雲流kafka活頁夾
- 12. 有沒有一種方法可以在Android中獲得沒有GPS的速度?
- 13. 從kafka消費消息時的異常
- 14. 有沒有一種方法可以讓ActionListener取消?
- 15. 是否有可能在Spark + KafkaRDD中獲得特定的消息偏移
- 16. 有沒有一種方法可以確定在用RabbitMQ提供「N之前的消息」計數器的特定消息之前的消息數量?
- 17. 有沒有辦法跟蹤Akka中的每條消息?
- 18. 在卡夫卡0.9有沒有一種方法可以列出消費者羣中所有消費者的抵消額?
- 19. 當沒有消費者時,獲取消息以堅持RabbitMQ
- 20. 有沒有辦法取消Google App Engine中的流量遷移?
- 21. 消費流,轉型,然後交給其他消費者(沒有州)的方法
- 22. 無法讀取一條消息與基於java的Kafka消費者
- 23. python-kafka:消費者可以根據消息屬性跳過消息嗎?
- 24. 使用KAFKA REST API消費JSON消息
- 25. 有沒有任何方法可以在沒有交互式消息的情況下獲得按鈕回答?
- 26. 有沒有一種方法可以在主要的FOREX消息下以編程方式避免EA交易?
- 27. 有沒有一種方法可以記錄Smalltalk上每個消息參數的類型? (如Objective-C)
- 28. kafka消費者提取API不返回正確的偏移值
- 29. spring-kafka(未集成)消費者不消費消息
- 30. 有沒有一種方法可以免費安裝.NET 3.5 SP1?