2016-08-23 50 views
0

我有2個JavaPairDStreams。它們具有相同的鍵(類型和值)和相同的值類型(不同的值)。我需要它們共享相同的狀態以返回基於當前狀態的結果,所以我使用相同的mapWithState函數。相同的mapWithState幾個JavaDStream使用的函數

JavaPairDStream<String, String> inputMessagesStream = readFromKafkaStream1(); 
JavaPairDStream<String, String> inputMessagesStream2 = readFromKafkaStream(); 
Function3<String, Optional<String>, State<MessageState>, String> messageState = (key, value, state) -> { 
       if (state.exists()) { 
        return state.get().process(value.get()); 
       } else { 
        MessageState ms = new MessageState(); 
        ms.process(value.get()); 
        state.update(ms); 
        return null; 
       } 
      }; 

JavaMapWithStateDStream<String, String, MessageState, String> message1 = inputMessagesStream.mapWithState(StateSpec.function(messageState)); 
JavaMapWithStateDStream<String, String, MessageState, String> message2 = inputMessagesStream2.mapWithState(StateSpec.function(messageState)); 

對2個不同的流使用相同的函數可以嗎?狀態是否正確更新並由每個流共享?

回答

0

是的。此功能不會關閉狀態(或任何其他),它將其作爲參數獲取。所以這兩個用途將會從不同的流'mapWithState獲得不同的狀態。

+0

只是爲了確定我不喜歡答案:雖然mapWithState函數可以用於更多的JavaPairDStreams,但State對象不會在流之間共享。每個流都有它自己的狀態對象,對嗎? – Vlad

+0

流一般沒有任何「狀態對象」。 'mapWithState'創建一個特殊類型的流,它具有狀態對象(每個鍵對應一個)。當然,調用它兩次將創建具有單獨狀態的流(即使您在單個流上調用它兩次)。 –