2017-02-14 140 views
1

我有一個用例,我必須以FIFO的方式處理事件。這些是機器產生的事件。每臺機器每30秒產生一個事件。對於特定的機器,我們需要根據FIFO fasion來處理事件。使用Spark Streaming進行FIFO處理?

我們需要每天處理大約2.4億個事件。對於如此巨大的規模,我們需要使用Kafka + Spark Streaming

從Kafka文檔中,我明白我們可以使用消息的關鍵字段將消息路由到特定的主題分區。這可以確保我可以使用機器ID作爲密鑰,並確保來自特定機器的所有消息落入相同的主題分區。

解決了50%的問題。

問題在處理方面。

Kafka Direct方法的spark文檔說RDD分區相當於Kafka分區。

因此,當我執行rdd.foreachPartition沒有任務迭代在有序fasion?

是否確保RDD的一個分區總是在一個執行器中?

是否確保foreachPartition任務僅由整個分區的一個線程執行?

請幫忙。

回答

1

假設您不使用任何重新分區數據的操作員(例如,repartition,reduceByKey,reduceByKeyAndWindow,...)。

因此,當我執行rdd.foreachPartition沒有任務迭代在有序fasion?

是的。它按照Kafka分區中的順序處理數據。

是否確保RDD的一個分區總是在一個執行器中?

是的。如果您不啓用speculation,則只有一個執行程序(任務)處理分區。如果速度太慢,speculation可能啓動另一個任務來運行相同的分區。

是否確保foreachPartition任務僅由整個分區的一個線程執行?

是的。它逐個處理一個分區中的數據。

+0

第一句在這個答案回答是非常重要的。任何混洗導致的操作都會導致數據「不同步」。如果這是一個問題,考慮明確地排序每臺計算機ID的事件。 –

0

從Kafka文檔中,我明白我們可以使用消息的關鍵字段將消息路由到特定的主題分區。這可以確保我可以使用機器ID作爲密鑰,並確保來自特定機器的所有消息落入相同的主題分區。

將數據發佈到Kafka時,您不需要使用機器ID。使用null作爲密鑰,kafka將在內部使用散列分區方案將數據適當地發送到不同的kafka主機。

問題在處理方面。

gotcha:當您在spark中處理時,它不會具有全局順序。例如:有5個事件(按時間排序):E0(最早),E1,E2,E3,E4(最新)

這些被路由到不同的卡夫卡分區:

Kakfa Partition P0: e0, e3 Kafka Partition P1: e1, e2, e4

所以當您正在閱讀您的火花作業,您將在另一個RDD中獲得e0, e3在一個RDD中,e1, e2, e4按順序獲得。

如果您想要全局排序(e0,e1,e2,e3,e4),您需要寫入單個分區的kafka。但是,那麼你將失去分區容忍並遇到一些性能問題(需要調整生產者和消費者)。 3000事件/秒應該沒問題,但是這也取決於你的kafka羣集。

你的其他問題已經由@zsxwing(see

相關問題