2016-09-02 25 views
0

在java服務器端經過一些處理後,我通過restful webservice從服務器發送日誌數據(json格式)到kafka。在發佈數據損壞時跳過kafka中的接收器步驟

在hdfs端我的接收器類型是avro。所以爲了解析json(source)到avro(destination),我使用了morphline和avro schema。

如果公佈的數據不適合morphline或Avro的模式,通常我得到下面的錯誤,

造成的:com.fasterxml.jackson.core.JsonParseException:非法 加引號字符((CTRL -CHAR,代碼10)):必須使用 反斜槓轉義被列入字符串值

另外,如果我得到這個曾經,偏移不再移動。簡而言之,如果kafka僅獲取一次該錯誤,它就不能再接收發布的數據。

爲了避免這個錯誤,我想有2個解決方案。首先是在服務器端編寫用於大數據端的avro模式的json驗證器。我首選的第二種方式是跳過並且不會沉沒未格式化爲請求的avro模式的日誌數據。但是,在跳過損壞的數據之後,如果kafka獲得合適的數據,它應該將其淹沒。

我認爲如果我在flume或kafka配置文件中添加一些參數是可能的。那麼當發佈的數據不適合請求的模式或請求的morphline時,如何跳過接收步驟?

回答

0

我解決了這個問題,在morphline側,

新增的try-catch代碼塊中morphline像

morphlines: [ 
    { 
    id: convertJsonToAvro 
    importCommands: [ "org.kitesdk.**" ] 
    commands: [ 
     { 
     tryRules { 
       catchExceptions : true 
      rules : [ 
      { 
       commands : [ 
       # save initial state 
       { readJson {} } 
       # extract JSON objects into fields 
       { extractJsonPaths { 
       flatten: true 
       paths: { 
      PROJECT_NAME: /PROJECT_NAME 
      WSDL_NAME: /WSDL_NAME 
      .... 
      .... 
      .... 
      MESSAGE_OUT: /MESSAGE_OUT 
     } 
     } } 
     # convert the extracted fields to an avro object 
     # described by the schema in this field 
     { toAvro { 
     schemaFile:/u0x/myPath/myAvroSchema.avsc 
     } } 
     # serialize the object as avro 
     { writeAvroToByteArray: { 
     format: containerlessBinary 
       } } 
      ] 
     } 
     { 
      commands : [ 
      { logWarn { format : "Ignoring record with unsupported input format in myLogService: {}", args : ["@{}"] } } 
      { dropRecord {} }  
      ] 
     } 
     ] 
    } 
    }  
    ] 
    } 
] 

tryRules我迫使代碼捕獲所有異常。

rules:你可以寫任何你想要的"command:"塊,如果其中一個引發一個異常,除了最後一個命令塊,最後一個命令將會運行。請記住,最後一個是「趕上」。我的意思是我的情況下,如果第一個命令塊失敗,最後一個(第二個)命令將運行。如果第一個命令完美運行,則最後一個命令將不起作用,因爲上一個命令塊像catch塊一樣工作。

所以,當代碼readJson {}在第一個命令塊失敗時,它拋出一個異常,最後一個命令(catch塊)處理它,所以它不嘗試卡夫卡主題中吸收電流的數據,因爲它會運行dropRecord {}

有關詳細文檔,您可以訪問kitesdk