2017-04-09 93 views
0

以下是代碼: 代碼行爲是生成從0n的數字的功率集合。PySpark中的重複計算

C - 代表它由大小l

凡所有集合的迭代l大小l + 1的所有可能的超集列舉爲C套和存儲回C的RDD。

根據在此示例中通過隨機生成器的輸出條件證明的標準,枚舉中排除了一些組。

from pyspark import SparkConf 
from pyspark import SparkContext 

from bitarray import bitarray 
import random 

def setadd(u, i): 
    r = u.copy() 
    r[i] = 1 
    return r 

def stringToBit(u): 
    r = bitarray() 
    r.frombytes(u) 
    return r 

def mapFunc(it): 
    global bdTH 
    global bdN 

    for s in it: 
     s = stringToBit(s[0]) 
     print(s) 
     r = random.randint(1, 10) 
     # elimination criteria 
     if r < bdTH.value: 
      continue 

     xmax = n - 1 
     while not s[xmax]: 
      xmax -= 1 

     for x in xrange(xmax + 1, bdN.value): 
      if s[x]: 
       continue 
      ns = setadd(s, x) 
      yield (ns.tobytes(), 0) 

def main(sc, n): 
    phi = bitarray('0') * n 
    C = [(setadd(phi, x).tobytes(), 0) for x in xrange(n)] 
    print(C) 
    C = sc.parallelize(C) 

    global bdN 
    bdN = sc.broadcast(n) 

    global bdTH 
    bdTH = sc.broadcast(random.randint(1, 10)) 

    l = 1 
    while l <= n: 
     C = C.partitionBy(100)\ 
      .mapPartitions(mapFunc) 

     l += 1 

     if C.count(): 
      print('count: ' + str(C.count())) 
     else: 
      print('count: 0') 

     bdTH = sc.broadcast(random.randint(1, 10)) 


if __name__ == "__main__": 
    conf = SparkConf() 
    conf = conf.setAppName("test") 
    sc = SparkContext(conf = conf) 

    n = 5 
    main(sc, n) 
    sc.stop() 

問題: 1.由於代碼確實是保證不評價任何兩次集。但是,輸出確實表明某些集合被評估了兩次。
2.變量bdTHbroadcast是否保證只有在爲迭代l生成了C或者Spark可以執行一些優化之後才發送。

Issue1

回答

0

火花RDDS懶惰,並且除非被高速緩存和/或檢查點,其完整的譜系將每個RDD被訪問時進行評價。

因爲你不緩存和C依賴於以前的Cs星火將評估C_i一次:

  • 對於C_i.count()
  • 對於C_i+1.count()

因爲你partition星火可以重用洗牌的文件,否則它會遞歸地回溯到第一個RDD。

+0

你可以在項目符號列表中清除你的語言嗎? –