2017-08-02 38 views
0

我發現沒有辦法在NiFi中直接從Avro中提取屬性,所以我使用ConvertAvroToJson-> EvaluateJsonPath - > ConvertJsonToAvro作爲解決方法。需要使用NiFi直接從Avro提取屬性

但我想編寫一個腳本來從Avro流文件中提取屬性用於ExecuteScript處理器,以確定它是否是更好的方法。

有沒有人有腳本來做到這一點?否則,我可能會最終使用原來的方法。

感謝,

凱文

+0

你能解釋一下你想要在提取值後做什麼嗎? –

回答

1

這裏的Groovy腳本(這需要在它的模塊目錄屬性的Avro JAR),我讓用戶指定與JSONPath表達的動態性能,對Avro的文件進行評估。諷刺的是它確實GenericData.toString()這無論如何轉換記錄JSON,但或許是這裏的一些代碼可以重用:

import org.apache.avro.* 
import org.apache.avro.generic.* 
import org.apache.avro.file.* 
import groovy.json.* 
import org.apache.commons.io.IOUtils 
import java.nio.charset.* 

flowFile = session.get() 
if(!flowFile) return 

final GenericData genericData = GenericData.get(); 
slurper = new JsonSlurper().setType(JsonParserType.INDEX_OVERLAY) 

pathAttrs = this.binding?.variables?.findAll {attr -> attr.key.startsWith('avro.path')} 
newAttrs = [:] 
try { 
    session.read(flowFile, { inputStream -> 
    def reader = new DataFileStream<>(inputStream, new GenericDatumReader<GenericRecord>()) 
    GenericRecord currRecord = null; 
    if(reader.hasNext()) { 
     currRecord = reader.next(); 
     log.info(genericData.toString(currRecord)) 
     record = slurper.parseText(genericData.toString(currRecord)) 
     pathAttrs?.each {k,v -> 
     object = record 
     v.value.tokenize('.').each { 
      object = object[it] 
      } 
      newAttrs[k - "avro.path."] = String.valueOf(object) 
     } 
     reader.close() 
    } 
} as InputStreamCallback) 
newAttrs.each{k,v -> 
    flowFile = session.putAttribute(flowFile, k,v) 
} 
session.transfer(flowFile, REL_SUCCESS) 
} catch(e) { 
    log.error("Error during Avro Path: {}", [e.message] as Object[], e) 
    session.transfer(flowFile, REL_FAILURE) 
} 

如果你的意思是提取Avro的元數據VS字段(不完全知道你是什麼意思「屬性「),還請檢查MergeContent的AvroMerge,因爲在那裏有一些代碼可以提取Avro元數據:

0

如果您要從每個流文件的單個Avro記錄中提取簡單模式,那麼ExtractText可能就足夠了。如果您想利用Apache NiFi 1.3.0中提供的新記錄處理功能,則應該從AvroReader開始,並且有a seriesblogs詳細說明了此process。您還可以使用ExtractAvroMetadata提取Avro元數據。