2017-05-22 36 views
2

我想在NiFi中創建一個流,該流需要一個有效的json文件並使用PutHiveStreaming處理器將其直接放入配置單元表中。我的JSON看起來像下面這樣:需要幫助推斷NiFi中的json文件的avro模式

{ 
"Raw_Json": { 
    "SystemInfo": { 
     "Id": "a string ID", 
     "TM": null, 
     "CountID": "a string ID", 
     "Topic": null, 
     "AccountID": "some number", 
     "StationID": "some number", 
     "STime": "some Timestamp", 
     "ETime": "some Timestamp" 
    }, 
    "Profile": { 
     "ID": "ID number", 
     "ProductID": "Some Number", 
     "City": "City Name", 
     "State": "State Name", 
     "Number": "XXX-XXX-XXXX", 
     "ExtNumber": null, 
     "Unit": null, 
     "Name": "Person Name", 
     "Service": "Purchase", 
     "AddrID": "00000000", 
     "Products": { 
      "Product": [{ 
       "Code": "CODE", 
       "Description": "some description" 

      }, 
      { 
       "Code": "CODE", 
       "Description": "some description" 

      }, 
      { 
       "Code": "CODE", 
       "Description": "some description" 

      }, 
      { 
       "Code": "CODE", 
       "Description": "some description" 

      }, 
      { 
       "Code": "CODE", 
       "Description": "some description" 

      }, 
      { 
       "Code": "CODE", 
       "Description": "some description" 

      }, 
      { 
       "Code": "CODE", 
       "Description": "some description" 

      }, 
      { 
       "Code": "CODE", 
       "Description": "some description" 

      }, 
      { 
       "Code": "CODE", 
       "Description": "some description" 

      }, 
      { 
       "Code": "CODE", 
       "Description": "some description" 
      }] 
     } 
    }, 
    "Total": { 
     "Amount": "some amount", 
     "Delivery": "some address", 
     "Estimate": "some amount", 
     "Tax": null, 
     "Delivery_Type": null 

    } 

}, 
"partition_date":"2017-05-19" 

}

我得到的JSON,使用InferAvroSchema處理器和使用推斷的Avro模式有轉換JSON來的Avro格式,並把它發送到PutHiveStreaming處理器。我的流程看起來是這樣的:

主要目標是,我想所有的「Raw_Json」列在蜂巢表被倒入一列,該表將被「partition_date進行分區「列將成​​爲表格的第二列。問題是,由於某種原因NiFi是有推斷從「Raw_Json」列中的嵌套JSON的問題,如下圖所示就像在桌子空爲之傾倒:

有誰知道我怎麼可能做出NiFi讀取「Raw_Json」列的整個嵌套Json作爲一列並將其發送到配置單元表?我如何創建自己的avro模式來執行此操作?任何有關如何解決這個問題的見解或想法將不勝感激!

回答

2

通常情況下,只要您的輸入文件格式假設總是相同,您就必須創建或生成(推斷)avro模式一次 - 兩個字段Raw_Jsonpartition_date

,你應該在文件中這樣的事情,例如avro-schema.json

{ 
    "type" : "record", 
    "name" : "test", 
    "fields" : [ { 
    "name" : "Raw_Json", 
    "type" : 
    ... 
    }, { 
    "name" : "partition_date", 
    "type" : "string", 
    "doc" : "Type inferred from '\"2017-05-19\"'" 
    } ] 
} 

而且在ConvertJSONToAvro處理器使用此文件作爲Record Schema

類型列Raw_Json的:

或者你也必須與所有嵌套的領域,陣列等

或完全定義複雜的數據類型,如果你想要寫的Raw_Json的內容轉換成字符串列,那麼在將文件轉換爲avro之前,必須將其轉換爲字符串。您可以使用EvaluateJsonPathAttributesToJson處理器的順序。

+0

非常感謝,我能夠通過使用EvaluateJsonPath和AttributesToJson處理器將它轉換爲字符串列,現在它工作正常。 –