2017-05-28 76 views
0

我想創建一個自定義NiFi處理器,它可以讀取ESRi ASCII grid files並返回CSV類似於每個文件的一些元數據和WKT格式的地理參考用戶數據的表示。自定義nifi處理器 - 寫流文件

不幸的是,分析結果不寫回爲一個更新的流程文件。

https://github.com/geoHeil/geomesa-nifi/blob/rasterAsciiGridToWKT/geomesa-nifi-processors/src/main/scala/org/geomesa/nifi/geo/AsciiGrid2WKT.scala#L71-L107是我在使這種情況發生在NiFi嘗試。

不幸的是,只有原始文件被返回。轉換後的輸出不會持久。

當試圖適應它手動序列化一些CSV字符串,如:

val lineSep = System.getProperty("line.separator") 
    val csvResult = result.map(p => p.productIterator.map{ 
    case Some(value) => value 
    case None => "" 
    case rest => rest 
    }.mkString(";")).mkString(lineSep) 

    var output = session.write(flowFile, new OutputStreamCallback() { 
    @throws[IOException] 
    def process(outputStream: OutputStream): Unit = { 
     IOUtils.write(csvResult, outputStream, "UTF-8") 
    } 
    }) 

仍然沒有flowflies寫入。要麼從上面的問題仍然存在,或者我得到StreamState不關閉outputStream的異常。

它必須是一個缺少一點點,但我似乎無法找到丟失的位。

回答

3

改變像session.write流文件中的每個會話()方法返回文件的新版本,你必須傳遞這個新版本。

如果更改converterIngester您的文件()函數,你必須回到這個新版本調用函數轉移到關係。

+0

我看到了,我應該手動序列化爲csv作爲我的虛擬代碼大綱?或將WiFi處理序列化案例類到Avro爲我? –

+2

你應該如此吧。對於nifi文件內容只是一個二進制流。也許你可以檢查在nifi https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main中爲數據庫結果完成序列化的過程/java/org/apache/nifi/processors/standard/util/JdbcCommon.java – daggett