2016-11-15 47 views
1

弗林克管道如下:在弗林克使用神交流

  1. 讀卡夫卡主題的消息(字符串)。
  2. 模式匹配通過grok轉換爲json格式。
  3. 從json提取的字段上的時間窗口上的聚合。

以下是使用grok進行模式匹配的代碼。

SingleOutputStreamOperator<JSONObject> mainStream = messageStream.rebalance() 
        .map(new MapFunction<String, JSONObject>() {  
         private static final long serialVersionUID = 6; 

         @Override 
         public JSONObject map(String value) throws Exception { 
          JSONObject logJson = new JSONObject(); 
          grok.compile(pattern); //pattern is some pattern defined in the class 
          Match gm = grok.match(value); 
          gm.captures(); 
          logJson.putAll(gm.toMap()); 
          return logJson; 
         }}) 

在上面的代碼編寫grok.compile(pattern) map函數內部工作正常。不這樣做提供了以下錯誤

的MapFunction的實現是不可序列

產生的原因:java.io.NotSerializableException:com.google.code.regexp.Pattern

是有什麼方法可以將地圖外部的grok.compile刪除。根據我的理解,每條消息都不需要彙編模式,如果不是,則可能會造成瓶頸。的消息變得相當大。

PS:我已經導入了包oi.thekraken.grok.api.Grok

編輯:

我通過神交實施看着和神交類實現Serializable接口。 https://github.com/thekrakken/java-grok/blob/master/src/main/java/io/thekraken/grok/api/Grok.java

回答

0

您的代碼並不顯示在局部變量神交從何而來,但:

弗林克要求所有運營商可序列化,因爲它們可能會在集羣中四處移動。這也適用於所有運營商的成員。你能發佈一個完整的非工作例子嗎?這可能會更容易看到序列化可能失敗的位置。

約弗林克系列化

更多信息,請弗林克文檔在 https://flink.apache.org/faq.html#why-am-i-getting-a-nonserializableexception-https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html

主要是ound,你可以,如果你需要運營商成員註冊自定義類型一KRYO串行或實施(去)序列化自己,不是直接可序列化的。

順便說一句:我想你是對試圖減少時間圖案編譯

+0

'基本上,你可以自定義類型註冊KRYO串行或實施(去)的序列化自己,如果你需要的數量不能直接序列化的操作員成員。「---我對如何執行相同操作有點困惑。 – user3351750