我想創建一個自定義NiFi
處理器,它可以讀取ESRi ASCII grid files
並返回CSV
類似於每個文件的一些元數據和WKT格式的地理參考用戶數據的表示。自定義nifi處理器 - 寫流文件
不幸的是,分析結果不寫回爲一個更新的流程文件。
https://github.com/geoHeil/geomesa-nifi/blob/rasterAsciiGridToWKT/geomesa-nifi-processors/src/main/scala/org/geomesa/nifi/geo/AsciiGrid2WKT.scala#L71-L107是我在使這種情況發生在NiFi嘗試。
不幸的是,只有原始文件被返回。轉換後的輸出不會持久。
當試圖適應它手動序列化一些CSV字符串,如:
val lineSep = System.getProperty("line.separator")
val csvResult = result.map(p => p.productIterator.map{
case Some(value) => value
case None => ""
case rest => rest
}.mkString(";")).mkString(lineSep)
var output = session.write(flowFile, new OutputStreamCallback() {
@throws[IOException]
def process(outputStream: OutputStream): Unit = {
IOUtils.write(csvResult, outputStream, "UTF-8")
}
})
仍然沒有flowflies寫入。要麼從上面的問題仍然存在,或者我得到StreamState不關閉outputStream的異常。
它必須是一個缺少一點點,但我似乎無法找到丟失的位。
我看到了,我應該手動序列化爲csv作爲我的虛擬代碼大綱?或將WiFi處理序列化案例類到Avro爲我? –
你應該如此吧。對於nifi文件內容只是一個二進制流。也許你可以檢查在nifi https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main中爲數據庫結果完成序列化的過程/java/org/apache/nifi/processors/standard/util/JdbcCommon.java – daggett