2
我有一個情況我需要檢查一個特定的消息是否已經存在一個主題或不在,我需要的主題絕對沒有重複。阿帕奇卡夫卡:檢查消息的存在,在主題
任何一個可以建議這樣做,而不是消耗所有的消息和檢查,對其中任何優雅的方式。
我有一個情況我需要檢查一個特定的消息是否已經存在一個主題或不在,我需要的主題絕對沒有重複。阿帕奇卡夫卡:檢查消息的存在,在主題
任何一個可以建議這樣做,而不是消耗所有的消息和檢查,對其中任何優雅的方式。
我不認爲自己在卡夫卡的專家,但我認爲你假裝什麼是「反對」卡夫卡的本質。
但是我出來使用Java的卡夫卡流庫的解決方案。基本上,該方法如下:
地圖的每個消息到一個新的鍵值,其中關鍵是早期密鑰和它的值的組合:(key1, message1) -> (key1-message1, message1)
組使用按鍵消息,作爲此操作的結果,您將獲得KGroupedStream。
應用reduce函數,將值修改爲一些自定義值,例如字符串「重複值」。
轉換所產生的KTable後減少到KStream並將它推到一個新的卡夫卡主題。
有在前面的解釋這麼多的假設,我要爲了提供一些代碼給一些輕:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> resources = builder.stream("topic-where-the-messages-are-sent");
KeyValueMapper<String, String, KeyValue<String,String>> kvMapper = new KeyValueMapper<String, String, KeyValue<String,String>>() {
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<String, String>(key + "-" + value, value);
}
};
Reducer<String> reducer = new Reducer<String>() {
public String apply(String value1, String value2) {
return "Duplicated message";
}
};
resources.map(kvMapper)
.groupByKey()
.reduce(reducer, "test-store-name")
.toStream()
.to("unique-message-output");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
有想法,這可能不是一個最佳的解決方案,也許你不會認爲這是解決你的問題的「優雅」方式。
我希望它有幫助。