2017-03-05 121 views
2

我有一個現有的Kafka Streams應用程序與0.10.1.1工作正常。更新爲新的0.10.2.0 Kafka Streams庫以及新的代理(儘管新庫可向後兼容0.10.1.1)。快速背景Kafka Streams應用程序問題與版本0.10.2.0

  • 我必須建立在交互式查詢元數據的頂部基於API一個REST API
  • 它會搜索本地存儲和還查詢遠程存儲(使用StreamsMetadata通過KafkaStreams.allMetadataForStore方法獲得)
  • 使用application.server配置參數爲了這個工作

應用程序正常工作與單個應用程序實例。當我開始另一個實例並通過REST查詢店裏,我會遇到以下問題

如果我執行哪個最先啓動的節點上的搜索,本地存儲檢索與此異常

SEVERE: Error - the state store, my-store, may have migrated to another instance. 
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, in-memory-avg-store, may have migrated to another instance. 
     at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49) 
     at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:55) 
     at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:699) 
     **at mycode.getLocalMetrics(myclass.java:121)** 
     **at mycode.remote(myclass.java:98)** 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
     at java.lang.reflect.Method.invoke(Unknown Source) 
失敗

如果我執行新的節點上的搜索,本地存儲的搜索是很好,但我看到.forEach一個空指針(新消費()

ks.allMetadataForStore(storeName) 
        .stream() 
        .filter(sm -> !(sm.host().equals(thisInstance.host()) && sm.port() == thisInstance.port())) //only query remote node stores 
        .forEach(new Consumer<StreamsMetadata>() { 
         @Override 
         public void accept(StreamsMetadata t) { 
          //some logic 
         } 

        }); 

什麼可能我會丟失?

+0

FYI:請參閱http://docs.confluent.io/current/streams/faq常見問題 - .html#handling-invalidstatestoreexception-the-state-store-may-migrate-to-another-instance獲取更多詳細信息以及如何解決該問題的信息。 –

回答

1

如果您啓動多個實例,則存儲可能會從一個實例遷移到另一個實例。如果您在商店遷移之前收集有關商店位置的元數據,則您的元數據不再正確。因此,您需要刷新元數據(即再次收集它)。

例外org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, in-memory-avg-store, may have migrated to another instance表示您的元數據失速。

因此,您可以捕獲此異常並重試從中恢復。

+1

是的。我以前看過這個,我知道這發生在商店遷移期間。但問題是,在0.10.2.0的情況下,這種遷移似乎永遠不會結束,即第二個節點一開始,應用程序就會進入這種狀態。另外,正如我所提到的,一切工作正常與v 0.10.1.1 - 沒有代碼改變如此。這正是我爲什麼有點困惑。 – Abhishek

+0

實例中的日誌告訴你什麼?應用程序是否重新平衡? –

+0

您可以使用'KafkaStreams.state()'來查看重新平衡是否完成,並且當您嘗試查詢時,您的應用程序處於何種狀態? –

1

也許你需要爲StreamsMetadata的創建添加一點'睡眠時間'。當我註釋掉Thread.sleep(1000L);時,我有同樣的錯誤。

KafkaStreams streams = new KafkaStreams(builder, props); 
    streams.start(); 

    Thread.sleep(1000L);//If commented out,error occur 
    System.out.println(streams.allMetadataForStore("statestore").size()); 
0

基於Matthias J. Sax意見,一個可以載入流存儲流statelistener裏面像下面

streams.setStateListener((newState, oldState) -> { 
    if (newState == State.RUNNING && oldState.REBALNCING) { 
     System.out.println(streams.allMetadataForStore("statestore").size()); 
    }}); 
相關問題