2017-01-16 50 views
0

我有一個PySpark的示例工作,它是PageRank算法的一個版本。 代碼如下:PySpark速度Ubuntu vs Windows

from __future__ import print_function 
from operator import add 
import timeit 
from pyspark.sql import SparkSession 

# Normalize a list of pairs(url, rank) to 1 
def normalize(ranks): 
    norm = sum([rank for u, rank in ranks]) 
    ranks = [(u, rank/norm) for (u, rank) in ranks ] 
    return sorted(ranks, key=lambda x: x[1], reverse=True) 

def pagerank_2(edgeList, n, niter): 
    # Loads all URLs from input file and initialize their neighbors. 
    m = edgeList.groupByKey().cache() 
    s = 0.85 

    # Loads all URLs with other URL(s) link to from input file 
    # and initialize ranks of them to one. 
    q = spark.sparkContext.range(n).map(lambda x: (x, 1.0)).cache() 
    r = spark.sparkContext.range(n).map(lambda x: (x, 0.0)).cache() 

    # Calculates and updates URL ranks continuously 
    # using PageRank algorithm. 
    for iteration in range(niter): 
     # Calculates URL contributions to the rank of other URLs. 
     # Add URL ranks based on neighbor contributions. 
     # Do not forget to add missing values in q and set to 0.0 
     q = q.fullOuterJoin(m)\ 
      .flatMap(lambda x: (x[1][1] and [(u, x[1][0]/len(x[1][1])) for u in x[1][1]]) or [])\ 
      .reduceByKey(add)\ 
      .rightOuterJoin(r)\ 
      .mapValues(lambda x: (x[0] or 0)*s + (1-s)) 
     print("iteration = ", iteration) 

    # Collects all URL ranks and dump them to console after normalization 
    ranks = normalize(q.collect()) 
    print(ranks[0:10]) 


if __name__ == "__main__": 

    spark = SparkSession\ 
      .builder\ 
      .master('local[*]')\ 
      .appName("SparkPageRank")\ 
      .config('spark.driver.allowMultipleContexts', 'true')\ 
      .config('spark.sql.warehouse.dir', 'file:///C:/Home/Org/BigData/python/BE4/') \ 
      .config('spark.sql.shuffle.partitions', '10')\ 
      .getOrCreate() 

    spark.sparkContext.setLogLevel('WARN') 

    g = [(0, 1), (0, 5), (1, 2), (1, 3), (2, 3), 
     (2, 4), (2, 5), (3, 0), (5, 0), (5, 2)] 
    n = 6 
    edgeList = spark.sparkContext.parallelize(g) 
    print(timeit.timeit('pagerank_2(edgeList, 6, 10)', number=1, globals=globals())) 

節點編號從0到n-1。 edgeList參數是一個RDD,它包含一對節點對(也稱爲邊)。

我在本地模式下在Windows 10(Anaconda,Spark 2.1.0,winutils)上運行它。 這項工作分配爲2896個任務,都非常輕。

我的問題是運行時間。 以上例爲:

  • Windows 10:> 40mn!
  • 視窗子系統用於Linux(Ubuntu的14.04):30秒

該計算機是便攜式計算機核心i7-4702HQ,16GB內存,512GB SSD。 當談到啓動過程時,Windows比Linux慢,但速度慢了50倍?肯定要做些什麼來縮小這個差距?

我已禁用的Windows Defender爲所有處於危險中的文件:java目錄,蟒蛇目錄等 要看看任何其他的想法?

感謝您的任何線索。

回答

0

也許關鍵是本地[*]這意味着

運行與儘可能多的工作線程爲您 機器上邏輯內核本地的火花。

嘗試使用諸如地方[10]

+0

我不這麼認爲。這臺機器上有8個內核,接近於10.無論如何,相同的參數適用於Windows和Ubuntu/WSL。 – FabPop