2016-12-25 81 views
0

我運行一個60碼EC2實例Spark可以並行嵌套嗎?

from pyspark import SparkContext 
import time, md5 
workers_count = 10 
sc = SparkContext("local[%s]" % workers_count, "App Name") 
max_num = 50000000 
start_time = time.time() 
first_item = sc.parallelize(xrange(max_num)).map(lambda n: (n, md5.md5(str(n)).hexdigest())).reduce(lambda a,b: a if a[1] > b[1] else b) 
end_time = time.time() 
print("sorting took took %s seconds with %s workers" % (end_time-start_time, workers_count)) 

下面的代碼與1名工人花費52秒。 有2名工作者需要26秒。 與4名工人花費13秒 與8名工人花費6秒 具有16名或更多的工人花費4秒(更多或更少)

上面的代碼的內部,並且它需要運行幾百萬次

從上面我明白,並行化會提高性能有多少限制,這是可以的,但是因爲我使用的是60核心機器,並且我希望它能夠做到最好核心的使用,我希望每個循環使用8個核心,有7個循環同時運行。

是否可以爲每個函數定義它將使用多少個內核?

回答

0

Spark可以並行嵌套嗎?

它不能。 Spark的並行執行必須是平坦的。

您可以使用單獨的線程提交多個併發作業。例如使用joblib與線程和numSlices

import hashlib 
from joblib import Parallel, delayed 

def run(sc, numSlices=8): 
    return sc.range(0, max_num, numSlices=numSlices) \ 
     .map(lambda n: (n, hashlib.md5(str(n)).hexdigest())) \ 
     .reduce(lambda a,b: a if a[1] > b[1] else b) 

Parallel(n_jobs=7, backend="threading")(delayed(run)(sc) for _ in range(7)) 
+0

我同意,這將解決本地臺機器的問題,但與火花的想法是,它也可以用多節點集羣中工作,而這個解決方案將不會縮放火花路徑 – ZivF

+0

只要集羣中有足夠的資源,它就會按預期工作。 – user7337271

+0

訣竅是線程不是並行化的提交。 – user7337271