2

上下文
Spark提供的RDD可以使用map函數來懶惰地設置並行處理操作。可以使用指定的分區參數來創建RDD,該分區參數確定每個RDD需要創建多少個分區,最好是該參數等於系統數量(例如,您有12個要處理的文件,創建一個包含3個分區的RDD,將RDD分割爲每個系統4個,每個系統同時處理所有文件)。據我瞭解,這些分區控制着去往每個系統進行處理的數據部分。數據從Spark中的RDDs以什麼順序獲取進程?

問題
我需要微調並控制很多功能在每個系統同時運行。如果兩個或更多功能同時在同一GPU上運行,則系統將崩潰。

問題
如果RDD沒有很好地均勻分割(如在上面的例子),多少個線程的系統上同時運行?


在:

sample_files = ['one.jpg','free.jpg','two.png','zero.png', 
       'four.jpg','six.png','seven.png','eight.jpg', 
       'nine.png','eleven.png','ten.png','ten.png', 
       'one.jpg','free.jpg','two.png','zero.png', 
       'four.jpg','six.png','seven.png','eight.jpg', 
       'nine.png','eleven.png','ten.png','ten.png', 
       'eleven.png','ten.png'] 
CLUSTER_SIZE = 3 
example_rdd = sc.parallelize(sample_files, CLUSTER_SIZE) 
example_partitions = example_rdd.glom().collect() 

# Print elements per partition 
for i, l in enumerate(example_partitions): print "parition #{} length: {}".format(i, len(l)) 
# Print partition distribution 
print example_partitions 

# How many map functions run concurrently when the action is called on this Transformation? 
example_rdd.map(lambda s: (s, len(s)) 
action_results = example_rdd.reduceByKey(add) 

日期:

parition #0 length: 8 
parition #1 length: 8 
parition #2 length: 10 
[ ['one.jpg', 'free.jpg', 'two.png', 'zero.png', 'four.jpg', 'six.png', 'seven.png', 'eight.jpg'], 
['nine.png', 'eleven.png', 'ten.png', 'ten.png', 'one.jpg', 'free.jpg', 'two.png', 'zero.png'], 
['four.jpg', 'six.png', 'seven.png', 'eight.jpg', 'nine.png', 'eleven.png', 'ten.png', 'ten.png', 'eleven.png', 'ten.png'] ] 

在結論
我需要知道,如果RDD拆分事情是這樣的,什麼控制同時處理多少個線程?它是核心數量,還是有一個全局參數可以設置,因此它只能在每個分區(系統)上一次處理4個?

+0

你有沒有試過看Spark界面?運行示例後,我會爲每個collect()操作獲取3個任務,這是您在創建RDD時提供的分區數。除此之外,並行運行的任務數量取決於Spark應用程序配置。例如,如果你使用'pyspark --master local [8]'開始,並且你至少有8個內核,它將能夠同時並行8個任務。 –

+0

自從工作很快就開始,我還沒有。我會看看,但我認爲UI會給出一個最高級別的視圖,說明有多少工作人員正在運行,而不是每個工作人員正在處理的併發線程數量。好吧,那麼你是說如果我將參數限制爲--master local [4],它應該只能同時並行4個任務?我會玩弄它,看看,謝謝。 – alfredox

回答

1

數據從Spark中的RDDs獲取數據的順序是什麼?

除非是一些邊界情況,例如只有一個分區,否則順序是任意的或不確定的。這將取決於羣集,數據和不同的運行時事件。

許多分區只爲給定階段設置了總體並行性的限制,換句話說,它是Spark中最小的並行性單位。無論您分配多少資源,單個階段都應該處理比當時更多的數據。當員工不可訪問並且任務在另一臺計算機上重新安排時,可能會出現邊界情況。

你可以想到的另一個可能的限制是執行程序線程的數量。即使增加分區數量,單個執行程序線程也只能處理一個分區。

上述兩者都不能告訴你處理給定分區的位置或時間。雖然您可以在配置級別使用一些骯髒,低效和不可移植的技巧(如單個worker,每個機器只有一個執行程序線程),以確保在特定機器上只處理一個分區,而不是特定的分區一般來說很有用。作爲一個經驗法則,我會說Spark代碼不應該關注它執行的時間。 API有一些低級別的方面,它提供了設置分區特定偏好的手段,但據我所知這些方面並不提供硬性保證。

話雖這麼說,一個能想到的至少幾個方法可以解決這個問題:

  • 長期運行的執行線程與配置水平保證 - 這是可以接受的,如果Spark是隻用於加載和保存數據負責
  • 單一對象,其控制在GPU上
  • 委託GPU處理的專業服務,確保正確接入作業排隊

另一方面,您可能對Large Scale Distributed Deep Learning on Hadoop Clusters感興趣,它粗略地描述了可在此適用的體系結構。

+0

我懷疑執行的順序可能是任意的,這就是爲什麼我關心這個實現並且無法擁有一個控制參數,因此沒有兩個進程在同一個GPU上運行。不過,我覺得我可能有一個解決方案。我想寫一個GPU忙的數據庫,併發進程只會選擇一個空閒的GPU或等待一個空閒的GPU,然後去嘗試。 與雅虎CaffeOnSpark鏈接真棒,但他們只用於培訓,它沒有跡象表明它可以用caffemodel文件檢測,我會研究。謝謝! – alfredox

相關問題