弗林克管道如下:在弗林克使用神交流
- 讀卡夫卡主題的消息(字符串)。
- 模式匹配通過grok轉換爲json格式。
- 從json提取的字段上的時間窗口上的聚合。
以下是使用grok進行模式匹配的代碼。
SingleOutputStreamOperator<JSONObject> mainStream = messageStream.rebalance()
.map(new MapFunction<String, JSONObject>() {
private static final long serialVersionUID = 6;
@Override
public JSONObject map(String value) throws Exception {
JSONObject logJson = new JSONObject();
grok.compile(pattern); //pattern is some pattern defined in the class
Match gm = grok.match(value);
gm.captures();
logJson.putAll(gm.toMap());
return logJson;
}})
在上面的代碼編寫grok.compile(pattern)
map函數內部工作正常。不這樣做提供了以下錯誤
的MapFunction的實現是不可序列
產生的原因:java.io.NotSerializableException:com.google.code.regexp.Pattern
是有什麼方法可以將地圖外部的grok.compile刪除。根據我的理解,每條消息都不需要彙編模式,如果不是,則可能會造成瓶頸。的消息變得相當大。
PS:我已經導入了包oi.thekraken.grok.api.Grok
編輯:
我通過神交實施看着和神交類實現Serializable接口。 https://github.com/thekrakken/java-grok/blob/master/src/main/java/io/thekraken/grok/api/Grok.java
'基本上,你可以自定義類型註冊KRYO串行或實施(去)的序列化自己,如果你需要的數量不能直接序列化的操作員成員。「---我對如何執行相同操作有點困惑。 – user3351750