0
我有2個JavaPairDStreams。它們具有相同的鍵(類型和值)和相同的值類型(不同的值)。我需要它們共享相同的狀態以返回基於當前狀態的結果,所以我使用相同的mapWithState函數。相同的mapWithState幾個JavaDStream使用的函數
JavaPairDStream<String, String> inputMessagesStream = readFromKafkaStream1();
JavaPairDStream<String, String> inputMessagesStream2 = readFromKafkaStream();
Function3<String, Optional<String>, State<MessageState>, String> messageState = (key, value, state) -> {
if (state.exists()) {
return state.get().process(value.get());
} else {
MessageState ms = new MessageState();
ms.process(value.get());
state.update(ms);
return null;
}
};
JavaMapWithStateDStream<String, String, MessageState, String> message1 = inputMessagesStream.mapWithState(StateSpec.function(messageState));
JavaMapWithStateDStream<String, String, MessageState, String> message2 = inputMessagesStream2.mapWithState(StateSpec.function(messageState));
對2個不同的流使用相同的函數可以嗎?狀態是否正確更新並由每個流共享?
只是爲了確定我不喜歡答案:雖然mapWithState函數可以用於更多的JavaPairDStreams,但State對象不會在流之間共享。每個流都有它自己的狀態對象,對嗎? – Vlad
流一般沒有任何「狀態對象」。 'mapWithState'創建一個特殊類型的流,它具有狀態對象(每個鍵對應一個)。當然,調用它兩次將創建具有單獨狀態的流(即使您在單個流上調用它兩次)。 –