2017-10-14 68 views
1

我試圖創建卡夫卡流的leftJoin的正常工作約10條記錄,然後將其與引起的NullPointerException這樣的代碼的異常崩潰:未能刷新狀態存儲

private static KafkaStreams getKafkaStreams() { 
    StreamsConfig streamsConfig = new StreamsConfig(getProperties()); 
    KStreamBuilder builder = new KStreamBuilder(); 

    KTable<String, Verkaeufer> umsatzTable = builder.table(Serdes.String(), EventstreamSerde.Verkaeufer(), CommonUtilsConstants.TOPIC_VERKAEUFER_STAMMDATEN); 
    KStream<String, String> verkaeuferStream = builder.stream(CommonUtilsConstants.TOPIC_ANZAHL_UMSATZ_PER_VERKAEUFER); 

    KStream<String, String> tuttiStream = verkaeuferStream.leftJoin(umsatzTable, 
      (tutti, verkaeufer) -> ("Vorname=" + verkaeufer.getVorname().toString() +",Nachname=" +verkaeufer.getNachname().toString() +"," +tutti.toString()), Serdes.String(), Serdes.String()); 

    tuttiStream.to(Serdes.String(), Serdes.String(), CommonUtilsConstants.TOPIC_TUTTI); 

    return new KafkaStreams(builder, streamsConfig); 
} 

StreamsConfig外觀像這樣:

private static Properties getProperties() { 
    Properties props = new Properties(); 
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CommonUtilsConstants.BOOTSTRAP_SERVER_CONFIGURATION); 
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, CommonUtilsConstants.GID_TUTTI); 
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); 
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "1000"); 

    return props; 
} 

完整堆棧跟蹤:

22:19:36.550 [gid-tutti-8fe6be58-d5c5-41ce-982d-88081b98004e-StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - stream-thread [gid-tutti-8fe6be58-d5c5-41ce-982d-88081b98004e-StreamThread-1] Failed to commit StreamTask 0_0 state: org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store KTABLE-SOURCE-STATE-STORE-0000000000 
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:262) 
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:190) 
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:282) 
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:264) 
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) 
at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259) 
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:253) 
at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:815) 
at org.apache.kafka.streams.processor.internals.StreamThread.access$2800(StreamThread.java:73) 
at org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:797) 
at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448) 
at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:789) 
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:778) 
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:567) 
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) Caused by: java.lang.NullPointerException: null 
at java.lang.String.<init>(String.java:143) 
at ch.wesr.eventstream.commonutils.serde.GsonDeserializer.deserialize(GsonDeserializer.java:38) 
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163) 
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:90) 
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34) 
at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:78) 
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:145) 
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:103) 
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:97) 
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:107) 
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:260) 
... 14 common frames omitted 

更新:

這是GsonDeserialize樣子

public class GsonDeserializer<T> implements Deserializer<T>{ 

    public static final String CONFIG_VALUE_CLASS = "default.value.deserializer.class"; 
    public static final String CONFIG_KEY_CLASS = "default.key.deserializer.class"; 
    private Class<T> deserializedClass; 
    private Gson gson = new GsonBuilder().create(); 

    public GsonDeserializer() {} 

    @Override 
    public void configure(Map<String, ?> config, boolean isKey) { 
     String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS; 
     String clsName = String.valueOf(config.get(configKey)); 
     try { 
      if (deserializedClass == null) { 
       deserializedClass = (Class<T>) Class.forName(clsName); 
      } 
     } catch (ClassNotFoundException e) { 
      System.err.printf("Failed to configure GsonDeserializer. " + 
          "Did you forget to specify the '%s' property ?%n", 
        configKey); 
      System.out.println(e.getMessage()); 
     } 
    } 

    @Override 
    public T deserialize(String s, byte[] bytes) { 
     return gson.fromJson(new String(bytes), deserializedClass); 
    } 

    @Override 
    public void close() {} 
} 
+0

看起來異常來自你自己的代碼:'at ch.wesr.eventstream.commonutils.serde.GsonDeserializer.deserialize(GsonDeserializer.java:38)' - 你可以仔細檢查嗎? –

+0

GsonDeserializer在其他幾個流應用程序中使用,在那裏它運行良好,並在我的代碼中記錄一些記錄10)運行良好,然後崩潰。如果我將StreamsConfig.CACHE-MAX_BYTES_BUFFERING_CONFIG提升到10000,它可以用於大約100條記錄 –

回答

1

只要高速緩存不對齊,你解串器永遠不會被調用。這就是爲什麼它在開始時不會失敗,並且可以通過緩存大小參數和提交間隔(我們在提交時刷新)增加時間直到它失敗。在你的代碼

尋找GsonDeserializer,似乎new String(bytes)無法與NPE - String構造不能採取null作爲參數 - 您解串器的代碼必須警惕bytes==null,並應爲這種情況下直接返回null

+0

很高興它現在能夠運行:) –

+1

@ user7327392:如果它現在有效,那麼您應該也「接受」答案。 –