我有一個現有的Kafka Streams應用程序與0.10.1.1工作正常。更新爲新的0.10.2.0 Kafka Streams庫以及新的代理(儘管新庫可向後兼容0.10.1.1)。快速背景Kafka Streams應用程序問題與版本0.10.2.0
- 我必須建立在交互式查詢元數據的頂部基於API一個REST API
- 它會搜索本地存儲和還查詢遠程存儲(使用StreamsMetadata通過KafkaStreams.allMetadataForStore方法獲得)
- 使用application.server配置參數爲了這個工作
應用程序正常工作與單個應用程序實例。當我開始另一個實例並通過REST查詢店裏,我會遇到以下問題
如果我執行哪個最先啓動的節點上的搜索,本地存儲檢索與此異常
SEVERE: Error - the state store, my-store, may have migrated to another instance.
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, in-memory-avg-store, may have migrated to another instance.
at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:55)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:699)
**at mycode.getLocalMetrics(myclass.java:121)**
**at mycode.remote(myclass.java:98)**
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
失敗
如果我執行新的節點上的搜索,本地存儲的搜索是很好,但我看到.forEach一個空指針(新消費()線
ks.allMetadataForStore(storeName)
.stream()
.filter(sm -> !(sm.host().equals(thisInstance.host()) && sm.port() == thisInstance.port())) //only query remote node stores
.forEach(new Consumer<StreamsMetadata>() {
@Override
public void accept(StreamsMetadata t) {
//some logic
}
});
什麼可能我會丟失?
FYI:請參閱http://docs.confluent.io/current/streams/faq常見問題 - .html#handling-invalidstatestoreexception-the-state-store-may-migrate-to-another-instance獲取更多詳細信息以及如何解決該問題的信息。 –