0

我在Apache NiFi中使用Kafka作爲緩衝系統創建數據攝取工作流程。我有一個3節點集羣運行相同的工作流程,每個節點有4個核心。由Kafka引起的NiFi工作流程中的瓶頸

我依賴於將數據移入和移出不同卡夫卡主題的幾個實例,這是工作流程中速度最慢的部分,並且性能方面非常不一致,因爲兩個相同的測試可能會有高達100%工期增加。

我們的發佈和使用Kafka處理器正在所有三個節點上運行,而我們的Kafka主題在三個經紀人中有3個分區。

有沒有人有任何想法會導致這種不一致,以及我能做些什麼來緩解它並加快工作流程?

+1

我們需要更多的細節... NiFi的版本是什麼?什麼版本的卡夫卡經紀人?什麼版本的Kafka處理器(0.9 vs 0.10)?你在PublishKafka前看到了流文件的構建,或者什麼是慢?您是否一次發佈並使用一條消息? –

+0

NiFi版本1.1.0,Kafka版本0.10.1.2.1,0.10 kafka處理器,在發佈kafka之前在隊列中建立文件,消耗kafka沒有按照需要快速返回文件,我相信它正在發佈和使用1個文件一段時間,不包括具體的批量邏輯。 – TomRobson

回答

3

單一最大的性能改進將是設計流程,以便每個流文件包含的消息數量較少且包含許多消息的流文件,而不是每個消息都有多個消息。

很難說如何爲您的用例做到這一點,因爲我不知道您的流量,如數據格式或您對每封郵件做什麼,但讓我們假裝您有CSV數據...目標是讓一個流文件包含多行CSV,而不是CSV的每行一個流文件。

在發佈方面,當您將此流文件發送到PublishKafka_0_10時,您將設置Message Demarcator屬性爲換行符(使用shift + enter),它會將CSV的每一行都傳送到Kafka。

在使用時,如果您還設置了消息分界符,那麼它會將許多消息寫入一個流文件,最多可以記錄最多的最大輪詢記錄。

此外,您可以嘗試調整每個處理器的併發任務(在調度選項卡上找到),以便並行執行更多發佈或消費。由於您有3個分區和3個NiFi節點,所以增加消費端的併發任務可能沒有太大好處,因此每個分區已經有一個線程,但是如果您有6個分區和3個NiFi節點,那麼您可能會受益於有2個併發任務。

+0

謝謝布萊恩,批量的想法工作! :) – TomRobson