上下文
Spark提供的RDD可以使用map函數來懶惰地設置並行處理操作。可以使用指定的分區參數來創建RDD,該分區參數確定每個RDD需要創建多少個分區,最好是該參數等於系統數量(例如,您有12個要處理的文件,創建一個包含3個分區的RDD,將RDD分割爲每個系統4個,每個系統同時處理所有文件)。據我瞭解,這些分區控制着去往每個系統進行處理的數據部分。數據從Spark中的RDDs以什麼順序獲取進程?
問題
我需要微調並控制很多功能在每個系統同時運行。如果兩個或更多功能同時在同一GPU上運行,則系統將崩潰。
問題
如果RDD沒有很好地均勻分割(如在上面的例子),多少個線程的系統上同時運行?
例
在:
sample_files = ['one.jpg','free.jpg','two.png','zero.png',
'four.jpg','six.png','seven.png','eight.jpg',
'nine.png','eleven.png','ten.png','ten.png',
'one.jpg','free.jpg','two.png','zero.png',
'four.jpg','six.png','seven.png','eight.jpg',
'nine.png','eleven.png','ten.png','ten.png',
'eleven.png','ten.png']
CLUSTER_SIZE = 3
example_rdd = sc.parallelize(sample_files, CLUSTER_SIZE)
example_partitions = example_rdd.glom().collect()
# Print elements per partition
for i, l in enumerate(example_partitions): print "parition #{} length: {}".format(i, len(l))
# Print partition distribution
print example_partitions
# How many map functions run concurrently when the action is called on this Transformation?
example_rdd.map(lambda s: (s, len(s))
action_results = example_rdd.reduceByKey(add)
日期:
parition #0 length: 8
parition #1 length: 8
parition #2 length: 10
[ ['one.jpg', 'free.jpg', 'two.png', 'zero.png', 'four.jpg', 'six.png', 'seven.png', 'eight.jpg'],
['nine.png', 'eleven.png', 'ten.png', 'ten.png', 'one.jpg', 'free.jpg', 'two.png', 'zero.png'],
['four.jpg', 'six.png', 'seven.png', 'eight.jpg', 'nine.png', 'eleven.png', 'ten.png', 'ten.png', 'eleven.png', 'ten.png'] ]
在結論
我需要知道,如果RDD拆分事情是這樣的,什麼控制同時處理多少個線程?它是核心數量,還是有一個全局參數可以設置,因此它只能在每個分區(系統)上一次處理4個?
你有沒有試過看Spark界面?運行示例後,我會爲每個collect()操作獲取3個任務,這是您在創建RDD時提供的分區數。除此之外,並行運行的任務數量取決於Spark應用程序配置。例如,如果你使用'pyspark --master local [8]'開始,並且你至少有8個內核,它將能夠同時並行8個任務。 –
自從工作很快就開始,我還沒有。我會看看,但我認爲UI會給出一個最高級別的視圖,說明有多少工作人員正在運行,而不是每個工作人員正在處理的併發線程數量。好吧,那麼你是說如果我將參數限制爲--master local [4],它應該只能同時並行4個任務?我會玩弄它,看看,謝謝。 – alfredox