我發現沒有辦法在NiFi中直接從Avro中提取屬性,所以我使用ConvertAvroToJson-> EvaluateJsonPath - > ConvertJsonToAvro作爲解決方法。需要使用NiFi直接從Avro提取屬性
但我想編寫一個腳本來從Avro流文件中提取屬性用於ExecuteScript處理器,以確定它是否是更好的方法。
有沒有人有腳本來做到這一點?否則,我可能會最終使用原來的方法。
感謝,
凱文
我發現沒有辦法在NiFi中直接從Avro中提取屬性,所以我使用ConvertAvroToJson-> EvaluateJsonPath - > ConvertJsonToAvro作爲解決方法。需要使用NiFi直接從Avro提取屬性
但我想編寫一個腳本來從Avro流文件中提取屬性用於ExecuteScript處理器,以確定它是否是更好的方法。
有沒有人有腳本來做到這一點?否則,我可能會最終使用原來的方法。
感謝,
凱文
這裏的Groovy腳本(這需要在它的模塊目錄屬性的Avro JAR),我讓用戶指定與JSONPath表達的動態性能,對Avro的文件進行評估。諷刺的是它確實GenericData.toString()
這無論如何轉換記錄JSON,但或許是這裏的一些代碼可以重用:
import org.apache.avro.*
import org.apache.avro.generic.*
import org.apache.avro.file.*
import groovy.json.*
import org.apache.commons.io.IOUtils
import java.nio.charset.*
flowFile = session.get()
if(!flowFile) return
final GenericData genericData = GenericData.get();
slurper = new JsonSlurper().setType(JsonParserType.INDEX_OVERLAY)
pathAttrs = this.binding?.variables?.findAll {attr -> attr.key.startsWith('avro.path')}
newAttrs = [:]
try {
session.read(flowFile, { inputStream ->
def reader = new DataFileStream<>(inputStream, new GenericDatumReader<GenericRecord>())
GenericRecord currRecord = null;
if(reader.hasNext()) {
currRecord = reader.next();
log.info(genericData.toString(currRecord))
record = slurper.parseText(genericData.toString(currRecord))
pathAttrs?.each {k,v ->
object = record
v.value.tokenize('.').each {
object = object[it]
}
newAttrs[k - "avro.path."] = String.valueOf(object)
}
reader.close()
}
} as InputStreamCallback)
newAttrs.each{k,v ->
flowFile = session.putAttribute(flowFile, k,v)
}
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error("Error during Avro Path: {}", [e.message] as Object[], e)
session.transfer(flowFile, REL_FAILURE)
}
如果你的意思是提取Avro的元數據VS字段(不完全知道你是什麼意思「屬性「),還請檢查MergeContent的AvroMerge,因爲在那裏有一些代碼可以提取Avro元數據:
如果您要從每個流文件的單個Avro記錄中提取簡單模式,那麼ExtractText
可能就足夠了。如果您想利用Apache NiFi 1.3.0中提供的新記錄處理功能,則應該從AvroReader
開始,並且有a series的blogs詳細說明了此process。您還可以使用ExtractAvroMetadata
提取Avro元數據。
你能解釋一下你想要在提取值後做什麼嗎? –