2017-05-29 27 views
0

如果這個問題已經得到解答,我很抱歉。我確實看過檔案,但沒有找到與我的問題相關的答案。爲什麼本地[*]不使用我機器中的所有可用內核?

我是Spark新手。我試圖運行本地並行附加的簡單示例,在我的MacOS Sierra機器中使用spark-2.1.1。由於我有4個內核,每個任務需要10秒才能完成4個任務,所以我希望總共花費10秒以上。

我看到每個任務都需要預計的時間。但在我看來,似乎只有2個執行線程。我期待着4.正如你在代碼中看到的那樣,每個元組的值就是相應任務的執行時間。

insight086:pyspark lquesada $多個輸出/部分00000

(u'1', 10.000892877578735) 
(u'3', 10.000878095626831) 

insight086:pyspark lquesada $多個輸出/部分00001

(u'2', 10.000869989395142) 
(u'4', 10.000877857208252) 

而且該走的總時間是相當多的超過20秒:

total_time 33.2253439426 

在此先感謝您的幫助!

乾杯, 路易斯

輸入文件:

1 
2 
3 
4 

SCRIPT:

from pyspark import SparkContext 
import time 

def mymap(word): 
    start = time.time() 
    time.sleep(10) 
    et=time.time()-start 
    return (word, et) 

def main(): 
    start = time.time() 
    sc = SparkContext(appName='SparkWordCount') 

    input_file = sc.textFile('/Users/lquesada/Dropbox/hadoop/pyspark/input.txt') 
    counts = input_file.flatMap(lambda line: line.split()) \ 
        .map(mymap) \ 
        .reduceByKey(lambda a, b: a + b) 
    counts.saveAsTextFile('/Users/lquesada/Dropbox/hadoop/pyspark/output') 

    sc.stop() 
    print 'total_time',time.time()-start 

if __name__ == '__main__': 
    main() 
+0

該數據集非常小,以至於不可能用它來證明任何東西...... – eliasah

+0

我的實際問題是所使用的數字核心。不過,我確實很欣賞你指出「核心數量不一致時性能數字不一致」的事實,因爲它肯定與我對開銷的擔憂有關。 –

回答

0

這就是爲什麼Divide and conquer algorithms其閾值,其中是有道理的,在使用它們。在Spark中添加分配(與並行),你有一個很好的機制來做這樣一個小巧的計算。你根本沒有利用Spark的這個4元素數據集的優勢。

假設隨着數據量越來越大,時間會圍繞您的期望收斂。

此外,讀取本地數據集時分區的數量最多爲2個,因此沒有repartitioning您僅使用2個內核。

再分配(numPartitions的:int)(隱式ORD:訂貨[T] = NULL):RDD [T]返回恰好具有numPartitions分區,一個新的RDD。

可以增加或減少此RDD中的並行性級別。在內部,這使用混洗重新分配數據。

如果您減少此RDD中分區的數量,請考慮使用coalesce,這可避免執行洗牌。


local[*]意味着爲您的計算機是否具備使用盡可能多的內核(參見案例LOCAL_N_REGEXSparkContext):

def localCpuCount: Int = Runtime.getRuntime.availableProcessors() 
val threadCount = if (threads == "*") localCpuCount else threads.toInt 

這只是一個提示多少分區默認使用,但並不能阻止Spark上升或下降。它主要取決於Spark的優化,以最終實現分佈式計算的最佳執行計劃。 Spark對你來說確實不少,抽象層次越高優化越多(請參閱Spark SQL優化器中的batches)。

相關問題