我打算把這個答案分成幾個部分,因爲這裏有很多事情要做。
我想知道是否有辦法在NiFi的流文件中發送帶有 屬性的空流文件?我想用這個作爲觸發器 來指示一種事件類型已經開始。
GenerateFlowFile
處理器允許您以常規運行計劃或使用CRON計劃發送空的(或填充的)流文件。您可以將其與UpdateAttribute
處理器結合使用,爲流文件添加任意靜態或動態屬性。
在NiFi中是否有任何其他方式可以指示一組事件 已啓動並完成?例如,如果我有三個處理器 讀入數據,我想知道第一個處理器是 即將被觸發,並且最後一個處理器已完成。無論如何,我有 嗎?
這接近批處理,Apache NiFi並沒有設計或優化。確定一個源處理器是「即將被觸發」是非常困難的。如果該處理器是以定時器/ CRON爲基礎觸發的,則可以知道該時間,但如果您的意思是「GetFile
即將成功檢索文件」,那麼這並不容易。可以使用自己的客戶處理器擴展處理器,並覆蓋onTrigger()
方法,以便在另一個處理器可以接受的DistributedMapCacheClientService
中存儲某些值。或者我想你可以將邏輯包裝在ExecuteScript
處理器中並編寫自定義通知代碼。我不確定的目標在這裏 - 誰得知這個狀態變化通知?它是另一個處理器,人類觀察者還是外部服務?
如果處理器繼續運行,我希望能夠將從處理器1讀取到處理器3的數據分組爲 。爲了使 這更清楚
Begin Processor1 Processor2 Processor3 End Begin Processor1 Processor2 Processor3 End ...
不過,我相信你所要求的是可能與使用新Wait
和Notify
處理器。 Koji Kawamura寫了一篇很好的文章描述他們的使用here。
我認爲在這種情況下,你需要特殊的內容或屬性能夠檢測通過該系統傳來的批次,除非它是數據的一個單元在同一時間。我會盡量在下面描述兩種情況,但我沒有太多的上下文。
方案1(數據的單一單位)
隨意替換爲不同的源的處理器,但我使用GetFile
爲了簡單起見。
假設你有一個目錄的完整文本文件(通過一些外部進程放在那裏)。每個文件都以「名字姓氏」的形式包含文本,並且其名稱爲Lastname_YYYY-MM-DD-HH-mm-ss.txt
,其中寫入的時間戳填充文件名。
GetFile -> ReplaceText -> PutFile
GetFile
處理器會將每個文件作爲單獨的流文件引入。從那裏,ReplaceText
可以做一些簡單的事情,比如使用正則表達式切換名稱的順序,PutFile
將內容寫回到文件系統。當GetFile
被觸發第一次,它將派遣ň flowfiles到連接/隊列ReplaceText
。如果你想讓它等待和並行執行的操作,而不是線性的,你可以在成功隊列中的回壓設定爲1
flowfile防止以上的處理器(GetFile
)運行,直到隊列爲空一次。
方案2(多flowfiles必須被組合在一起,並結合手術)
這裏,你會想用MergeContent
收集多個flowfiles到一個單一的一個。您可以在bin門檻設置爲ñ flowfiles,當它達到傳入flowfiles的最小數量的MergeContent
處理器將只傳輸一個成功 flowfile。您也可以按屬性進行分類,因此,如果您從異構輸入源讀取數據,則仍然可以根據共同特徵關聯相關聯的數據片段。
與Wait
& Notify
可選場景,此外,還可以使用Notify
處理器觸發flowfile發送到相應的Wait
處理器「釋放」的「內容」 flowfiles他們所需的目的地。同樣,Koji的文章上面鏈接了一個示例流程和一些截圖詳細解釋了這一點。
我希望這至少給你一個方向可循。沒有更多的上下文,我仍然可以感受到你正試圖在這裏解決非NiFi問題,或者也許可以調整你的數據流模型來更好地支持流式思路。如果您有更多信息,我很樂意擴大答案。
非常感謝您抽出時間給出詳細的回覆 - 無法告訴您我多麼感激!你的答案非常清晰,易於理解。我想我正在尋找的是等待和通知 - 我肯定會更多地探索這個選項,謝謝你指出這一點! – BigBug