2016-07-17 28 views
0
加入一個空指針2個卡夫卡KTable結果

我試圖連接兩個卡夫卡流DSL KTable使用:在RocksDB

KTable<String, String> source = builder.table("stream-source"); 
KTable<String, String> target = builder.table("stream-target"); 
source.join(target, new ValueJoiner<String, String, String>() { 
    public String apply(String value1, String value2) { 
     return value1 + ":" + value2; 
    } 
}); 

我已經確定,無論是鍵和值不null

Producer<String, String> producer = new KafkaProducer<String, String>(props); 
for(int i = 0; i < PERSONS_SOURCE.length; i++) { 
    producer.send(new ProducerRecord<String, String>("stream-source",  Long.toString(i + 1L), PERSONS_SOURCE[i])); 
} 
for(int i = 0; i < PERSONS_TARGET.length; i++) { 
    producer.send(new ProducerRecord<String, String>("stream-target", Long.toString(i + 1L), PERSONS_TARGET[i])); 
} 
producer.close(); 

但是應用程序報告RocksDB層中有關於分區的空指針。

[2016年7月17日21:58:04682] ERROR用戶提供收聽org.apache.kafka.streams.processor.internals.StreamThread用於組流-persons2 1 $上分區分配失敗(org.apache。 kafka.clients.consumer.internals.ConsumerCoordinator) java.lang.NullPointerException at org.rocksdb.RocksDB.put(RocksDB.java:432) at org.apache.kafka.streams.state.internals.RocksDBStore.putInternal( RocksDBStore.java:299) at org.apache.kafka.streams.state.internals.RocksDBStore.access $ 200(RocksDBStore.java:62) at org.apache.kafka.streams.state.internals.RocksDBStore $ 3.restore( RocksDBStore.java:206) at org.apache.kafka.streams.processor .internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:210) at org.apache.kafka.streams.processor.internals .ProcessorContextImpl.register(ProcessorContextImpl.java:116) 在org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:202)

回答

1

實測值的問題是由於流被在應用程序代碼中創建而不是使用以下命令: -

kafka-topics --create --topic stream-a --replication-factor 1 --partitions 1 

似乎是連接需要分區信息才能工作。

+0

你可能想接受你自己的回答:) –