2016-12-05 70 views
2

我使用Spark Streaming v2.0.0從Kafka檢索日誌並進行一些操作。我正在使用功能mapWithState以保存和更新與設備相關的某些字段。我想知道這個函數如何在集羣中工作。事實上,我現在只是使用獨立模式,但稍後我會嘗試使用Yarn羣集。Spark Streaming:mapWithState函數如何在集羣中工作?

但是,假設我有一個有多個節點的集羣,如果一個節點更新設備的狀態,他是否立即通知此更新的所有其他節點?如果否,則需要設置羣集中的mapWithState函數。我該怎麼做?

回答

3

但是,假設我有一個有多個節點的集羣,如果一個節點更新設備的狀態,他是否立即通知此更新的所有其他節點?如果否,則需要設置集羣中的mapWithState函數。

這不是mapWithState的工作原理。 mapWithState是一個洗牌階段,這意味着它會導致羣集中的數據移動。這如何影響mapWithState?每個條目(鍵值對)將被洗牌到一個特定的執行者。在隨後到達同一密鑰時,無論執行者在給定時間從輸入流處理它,它都將被洗牌到持有內存映射的節點和先前消息的狀態。這是通過HashPartitioner默認完成的,它會散列密鑰,然後將其發送到保存狀態的正確執行器,這就是爲什麼您需要仔細選擇密鑰。

這意味着特定密鑰的狀態不會散佈在整個羣集中。它被分配到集羣內的一個特定的執行器,並且每次基於密鑰的哈希值,傳入的數據將保持回到那個。

+1

謝謝Yuval,這非常清楚! –

+0

Hi Yuval,你有關於HashPartitioner的任何文檔。我只有這個鏈接,但我不明白它是如何工作的。 http://spark.apache.org/docs/2.0.2/api/java/index.html?org/apache/spark/HashPartitioner.html –

+1

http://stackoverflow.com/questions/31424396/how-does- hashpartitioner-work –

0

所有有狀態轉換都通過密鑰對數據進行混洗,因此特定密鑰的所有值都在同一個執行程序線程上處理。

不需要額外的同步和一個特定的鍵狀態總是一致的。

+0

當你說「同一臺機器」時,你的意思是「同一個節點」?如果我不瞭解你寫的內容,那麼節點之間已經有了一個關於變換的同步? –

+0

我的意思是執行者線程。 – user7252138

-1

檢查點是作爲目錄提供的,所以可以從本地文件系統,NFS掛載,HDFS託管或S3託管!

現在,考慮YARN + HDFS組合。由於mapWithState而寫入檢查點的任何數據將根據狀態密鑰分佈在不同的HDFS節點上,並且spark會嘗試在相同節點上調度依賴於它的任務。

但是如果你考慮的話,YARN + NFS(可能根本不合邏輯)。每個節點應該在相同的掛載點掛載NFS,並且每個讀/寫請求將是一個NFS請求。這將創造一個完美的瓶頸!

讓我們假設,狀態管理用戶會話。我們可能會選擇保留每個用戶少量的信息或多個GB信息。狀態中的鍵應該以某種方式唯一標識用戶,並且每次觸發mapWithState函數時,都可以訪問保存在該用戶狀態中的所有信息。

+0

我不會檢查點到S3;檢查點依賴於重命名來提交檢查點,並且在非常緩慢且不是原子的對象存儲上。 –

相關問題