2017-03-19 183 views
2

我想我有一個初學者的bug,但我真的不知道如何解決它,這讓我瘋狂。 我具有由2臺機器的羣集:Spark/Hadoop作業沒有運行在parralel

  1. 8GB RAM(6.9可用),4個芯,Win10:運行一個主,一個工人,而且它也從我正在運行的Java驅動程序程序(本機來自的IntelliJ)
  2. 2GB RAM(1.3可用),4芯的,Ubuntu 16.04的VM(拼命地跑在了VBox):運行一個工人

我有一個類網絡,我想生成一個for循環網絡從單一網絡的列表開始,然後通過使用平面映射將每個網絡變換成N個新網絡。之後,我有一個過濾器和一個計數。步驟:

JavaSparkContext sc = new JavaSparkContext(conf); 
    List<Network> data = Arrays.asList(new Network()); 
    JavaRDD<Network> currentN = sc.parallelize(data); 
    for(int k=1;k<=10;k++) { 
     JavaRDD<Network> newN = currentN.flatMap(new MyFlatMap()); 
     currentN = newN; 
    } 
    JavaRDD<Network> filteredNetworks = currentN.filter(new MyFilter()); 
    System.out.println(filteredNetworks.count()); 

算法工作並輸出正確的值。

然而,通過比較在不同的應用場景的持續時間,我傾向於認爲本申請並不平行拼命地跑:使用兩臺機器,總公羊和8核的2GB的

羣集:持續時間1,5分鐘,只使用第二機(VM Ubuntu的),1GB RAM

羣集,4芯:持續時間1,1分鐘

羣集僅使用所述第一機器(其是一名高手,一名工人,和司機),1GB內存,4芯:持續3,2分鐘

截圖我的火花UI的:

活着工人第一次運行的 Alive workers

事件時間表(帶兩臺機器): Event timeline of first run

通過執行程序聚合指標(用兩臺機器) Metrics

我不知道爲什麼在運行Ubuntu的一個虛擬機的機器比主機快(這是主,工人和驅動程序),因爲該主機具有更好的CPU (i7 2.6GHZ與i3 1,9GHZ相比)。

但是主要問題是爲什麼在單臺機器上運行比在兩臺機器上運行都快?難道不是相反嗎?我的猜測是RDD不是並行計算的。如果是這種情況,請你解釋爲什麼以及如何使它能夠並行處理?

解釋的工作做什麼:

基本上,這就是我想要的,以實現循環的:

我從1開始網絡的RDD(它不是一個文件,它的只是一個小班)。

在for循環中,我使用flatMap將1個網絡轉換爲10個新網絡。

Iteration0:currentN = 1網絡 - > flatMap - > currentN = 10個網絡

Iteration1:currentN = 10網絡 - > flatMap - > currentN = 100個網絡

。 。

Iteration9:currentN = 10^8個網絡 - > flatMap - > currentN = 10^9網絡

因此,正如我說,我生成輸入。我想這樣做並行生成,所以這意味着flatMap需要並行執行。爲了實現這一點,火花應:

  1. 採取RDD用N網絡

  2. 鴻溝RDD成每個芯8個分區,每個分區具有N/8網絡

  3. 應用flatMap並行地將每個N/8網絡轉換爲N/8 * 10個新網絡。

  4. 在每臺機器上重複上述步驟,並行生成flatMap。

  5. for循環結束後,每臺機器應該有10^9/8網絡。並行過濾它們,然後計算每臺機器上每個RDD中元素的數量,然後輸出答案。

這是我想實現的,但由於某些原因,for循環中的flatMap生成僅在一臺機器上完成。

回答

0

我終於設法解決了這個問題。這個錯誤是如此愚蠢和明顯,但花了我很長時間才弄清楚它...

正如我前面提到的,我不是從一個文件讀取輸入,而是生成從開始的輸入1 Network然後我在該網絡上做flatMap以獲得N個網絡,比我獲得N * M個新網絡等等。

但因爲我從只有1網絡,開始的時候我做

List<Network> data = Arrays.asList(new Network()); 
JavaRDD<Network> currentN = sc.parallelize(data); 

得到的數據只能在一個CPU並行,在一個任務,因爲RDD只包含一個元素,因此這個問題。

1

來自spark UI的最後一張截圖顯示,在您的8個任務中,7個已完成,最長時間爲37 ms,而最長時間的任務至少運行了46秒。

如果您有一項任務運行3分鐘,而其他任務運行時間不到一秒,則分佈式計算不平衡,因此您無法利用多臺計算機一起運行,因爲計算時間受到您的限制最長的任務。

這種行爲通常是由不平衡操作/轉換(join,...)引起的不平衡大小(1個Ko的7個文件和1個Go的1個文件)的輸入引起的。

最後,它很難解釋你的時機vs CPU而不知道你的工作是什麼,但一個潛在的解釋是你有一個數據密集型工作(而不是CPU密集型工作),因此瓶頸是硬盤驅動器最慢CPU的機器上的SSD)。

+1

我使用** conf.set(「spark.default.parallelism」,「8」)**使集羣使用所有8個核心,這就是爲什麼UI顯示8個任務,但只有一個真正的計數階段,這基本上是唯一的工作。正如你所看到的,我不使用文件作爲輸入,但我寧願使用flatMap在for循環的每次迭代中生成輸入。所以我真的不知道爲什麼for循環中的生成階段沒有並行運行。 –

+0

我編輯了原始問題並添加了我想要實現的解釋。也許這有助於你理解發生了什麼,因爲我真的需要這個幫助。 –

+0

如果在循環中使用「repartition」操作符會發生什麼情況。 – glefait