2017-09-01 75 views
1

使用Processor API和addGlobalStore函數創建全局KTable時,生成的存儲會填充OK。但隨後嘗試遍歷商店的內容導致以下異常:在Kafka中查詢全局狀態存儲Streams會拋出Null異常

Exception in thread "main" java.lang.NullPointerException                            at org.apache.kafka.streams.state.internals.SerializedKeyValueIterator.next(SerializedKeyValueIterator.java:63)            at org.apache.kafka.streams.state.internals.SerializedKeyValueIterator.next(SerializedKeyValueIterator.java:26)            at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:208)         at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:189)         at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore$CompositeKeyValueIterator.next(CompositeReadOnlyKeyValueStore.java:155)   at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore$CompositeKeyValueIterator.next(CompositeReadOnlyKeyValueStore.java:113)   at org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator.hasNext(DelegatingPeekingKeyValueIterator.java:63) 

這是從簡單的代碼,在專賣店迭代:

ReadOnlyKeyValueStore<String, StreamConfig> store = this.globalStreams.store("config-table",QueryableStoreTypes.<String, Config>keyValueStore()); 
final KeyValueIterator<String, Config> iteratble = store.all(); 
HashMap<String, Config> dynamicStreams = new HashMap<String,Config>(); 
while (iteratble.hasNext()) { 
    final KeyValue<String, Config> next = iteratble.next(); 
    dynamicStreams.put(next.key, next.value); 
} 
iteratble.close(); 

的條目的任何訪問該商店將導致該例外。如果我使用globalTable函數創建全局狀態存儲(如果這不是反序列化問題),則同樣的主題可以正常工作。狀態存儲還會成功返回行數(因此它完全填充)。

全局狀態存儲創建如下,這是使用Kafka Streams 0.10.2.1。

KStreamBuilder globalBuilder = new KStreamBuilder(); 
StateStoreSupplier<KeyValueStore<String, Config>> storeSupplier = Stores 
    .create("config-table") 
    .withKeys(Serdes.String()) 
    .withValues(configSerdes) 
    .persistent() 
    .build(); 

// a Processor that updates the store 
ProcessorSupplier<String, Config> procSupplier =() -> new ConfigWorker(); 
globalBuilder.addGlobalStore(
    storeSupplier.get(), 
    "config-table-source", 
    new StringDeserializer(), 
    configDeserializer, 
    "config", 
    "config-worker", 
    procSupplier) 
    .buildGlobalStateTopology(); 

this.globalStreams = new KafkaStreams(
    globalBuilder, 
    this.getProperties()); 
globalStreams.start(); 

編輯:

  • 的一個問題是,默認情況下狀態存儲記錄和全球KTables沒有被更改日誌備份。因此,向國家商店創建添加一個.disableLogging解決了這個問題。

  • 另一個問題似乎是在處理器的init函數中致電context.schedule。這是拋出一個無效操作異常。刪除這個固定的代碼,但我想punctuate現在不會被調用。它似乎現在工作 - 但不清楚爲什麼我不能撥打schedule

回答

0

似乎不允許在全局上下文中進行轉發,按照此code。所以我的全局處理器只需要處理入站更新。

相關問題