1

我使用Spark 2.1.1。如何分別處理Kafka分區並與Spark執行程序並行處理?

我使用結構化流式處理從2個Kafka分區讀取消息。我正在向Spark Standalone集羣提交我的應用程序,其中包含一名工作人員和兩名執行程序(每個2個核心)。

./bin/spark-submit \ 
    --class MyClass \ 
    --master spark://HOST:IP \ 
    --deploy-mode cluster \ 
    /home/ApplicationSpark.jar 

我想要的功能是,來自每個卡夫卡分區的消息應該由每個獨立的執行程序獨立處理。但是現在發生的事情是,執行者分別讀取和映射分區數據,但是在映射形成的無界表之後,通常使用並且具有來自兩個分區的數據。

當我在表上運行結構化查詢時,查詢必須處理來自兩個分區(更多數據量)的數據。

select product_id, max(smr.order_time), max(product_price) , min(product_price) 
from OrderRecords 
group by WINDOW(order_time, "120 seconds"), product_id 

其中卡夫卡分區是Product_id

是否有任何方式來運行結構化查詢平行相同的,但分開的數據,從卡夫卡分區到的執行器被映射?

+0

如何檢查_「執行者分別讀取和映射分區數據,但在映射所形成的無界表之後,通常會使用這些表,並使數據來自這兩個分區。你看執行計劃嗎?你能粘貼它嗎?你還用什麼來聲明這個聲明? –

+0

當我打印該exe文件時。計劃,它顯示**「沒有實際計劃」**。 _當我們打印計數查詢的輸出時我們看到了什麼;它顯示了跨分區的記錄總數。 我們沒有要求跨分區執行統一數據的查詢。我們需要的是獨立處理較小的數據集。 例如 卡夫卡分區 - P1,P2 P1包含產品1至10 P2包含產品11至20 當我們運行Spark流的兩個執行器時,我們希望對每個10個記錄的單個數據集進行操作。 – kadsank

回答

0

但是現在發生的情況是,執行者分別讀取和映射分區數據,但映射後形成的無界表通常被使用並且具有來自兩個分區的數據。因此,當我在表上運行結構化查詢時,查詢必須處理來自兩個分區的數據(更多數據量)。

這是理解什麼以及如何執行而不會導致跨分區拖動和發送數據(甚至可能通過線路)的關鍵。

確切的答案取決於您的查詢。如果他們工作的記錄組在多個主題分區上傳播,因此分佈在兩個不同的Spark執行程序上,則必須特別小心您的算法/轉換,以便在單獨的分區上執行處理(僅使用可用的分區)並僅彙總結果。