2017-10-10 45 views
0

我想在Kafka(0.11)中的聚合函數中使用SessionWindows,但無法理解,爲什麼我會收到錯誤。在KafkaStreams中使用SessionWindows聚合數據(0.11)

這裏是我的代碼片段:

// defining some values: 
public static final Integer SESSION_TIMEOUT_MS = 6000000; 
public static final String INTOPIC = "input"; 
public static final String HOST = "host"; 

// setting up serdes: 
final Serializer<JsonNode> jsonSerializer = new JsonSerializer(); 
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); 
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer); 

// some more code to build up the streams 
KStreamBuilder builder = new KStreamBuilder(); 
KStream<String, JsonNode> dataStream = builder.stream(Serdes.String(), jsonSerde, INTOPIC); 

// constructing the initalMessage ObjectNode: 
ObjectNode initialMessage = JsonNodeFactory.instance.objectNode(); 
initialMessage.put("count", 0); 
initialMessage.put("endTime", ""); 

// transforming data to KGroupedStream<String,JsonNode> 
KGroupedStream<String, JsonNode> data = dataStream.map((key, value) ->{return new KeyValue<>(value.get(HOST).asText(), value); }).groupByKey(Serdes.String(), jsonSerde); 

// finally aggregate the data usind SessionWindows 
KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate(
      () -> initialMessage, 

      (key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage), 

      SessionWindows.with(SESSION_TIMEOUT_MS), 

      jsonSerde, 

      "aggregated-data"); 

private static JsonNode countData(JsonNode incomingMessage, JsonNode initialMessage){ 
// some dataprocessing 
} 

當我改變

KTable<Windowed<String>,JsonNode> 

KTable<String, JsonNode> 

,並刪除

SessionWindows.with(SESSION_TIMEOUT_MS) 

來自聚合函數,一切都很好。

如果我不這樣做,日食告訴我要行

KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate([...]) 

在類型KGroupedStream的方法彙總(初始化程序,聚合時,Windows,SERDE,字符串)不適用的參數(( ) - > {},(鍵,incomingMessage,initialMessage) - > {},SessionWindows,SERDE,字符串)

和用於線

() -> initialMessage 

類型不匹配:不能從ObjectNode轉換爲VR

和:

(key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage), 

在類型DataWindowed方法countData(JsonNode,JsonNode)是不適用的參數( JsonNode,VR)

我真的沒有看到,哪裏的類型迷路了! 任何提示都會很棒!

THX:d

+0

難道只是一個錯字'(key.value )' - >'(key,value)'(逗號而不是點)? –

+0

是的,對不起。只是在這篇文章中修復它。但這不是解決方案。 你有任何其他的想法,如何解決這個問題? – sunjazz

+0

不是從代碼主演。也許我們的示例回購幫助:https://github.com/confluentinc/kafka-streams-examples我們有幾個使用lambdas的示例。 –

回答

1

我真的需要實施一個合併:

Merger<? super String, JsonNode>tmpMerger = new MergerClass<String, JsonNode>(); 

,並把它添加到聚合函數:

KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate(
     () -> initialMessage, 

     (key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage), 

     tmpMerger, 

     SessionWindows.with(SESSION_TIMEOUT_MS), 

     jsonSerde, 

     "aggregated-data");