0
我想在Kafka(0.11)中的聚合函數中使用SessionWindows,但無法理解,爲什麼我會收到錯誤。在KafkaStreams中使用SessionWindows聚合數據(0.11)
這裏是我的代碼片段:
// defining some values:
public static final Integer SESSION_TIMEOUT_MS = 6000000;
public static final String INTOPIC = "input";
public static final String HOST = "host";
// setting up serdes:
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
// some more code to build up the streams
KStreamBuilder builder = new KStreamBuilder();
KStream<String, JsonNode> dataStream = builder.stream(Serdes.String(), jsonSerde, INTOPIC);
// constructing the initalMessage ObjectNode:
ObjectNode initialMessage = JsonNodeFactory.instance.objectNode();
initialMessage.put("count", 0);
initialMessage.put("endTime", "");
// transforming data to KGroupedStream<String,JsonNode>
KGroupedStream<String, JsonNode> data = dataStream.map((key, value) ->{return new KeyValue<>(value.get(HOST).asText(), value); }).groupByKey(Serdes.String(), jsonSerde);
// finally aggregate the data usind SessionWindows
KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate(
() -> initialMessage,
(key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage),
SessionWindows.with(SESSION_TIMEOUT_MS),
jsonSerde,
"aggregated-data");
private static JsonNode countData(JsonNode incomingMessage, JsonNode initialMessage){
// some dataprocessing
}
當我改變
KTable<Windowed<String>,JsonNode>
到
KTable<String, JsonNode>
,並刪除
SessionWindows.with(SESSION_TIMEOUT_MS)
來自聚合函數,一切都很好。
如果我不這樣做,日食告訴我要行
KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate([...])
在類型KGroupedStream的方法彙總(初始化程序,聚合時,Windows,SERDE,字符串)不適用的參數(( ) - > {},(鍵,incomingMessage,initialMessage) - > {},SessionWindows,SERDE,字符串)
和用於線
() -> initialMessage
類型不匹配:不能從ObjectNode轉換爲VR
和:
(key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage),
在類型DataWindowed方法countData(JsonNode,JsonNode)是不適用的參數( JsonNode,VR)
我真的沒有看到,哪裏的類型迷路了! 任何提示都會很棒!
THX:d
難道只是一個錯字'(key.value )' - >'(key,value)'(逗號而不是點)? –
是的,對不起。只是在這篇文章中修復它。但這不是解決方案。 你有任何其他的想法,如何解決這個問題? – sunjazz
不是從代碼主演。也許我們的示例回購幫助:https://github.com/confluentinc/kafka-streams-examples我們有幾個使用lambdas的示例。 –