0
以下是代碼: 代碼行爲是生成從0
到n
的數字的功率集合。PySpark中的重複計算
C
- 代表它由大小l
凡所有集合的迭代l
大小l + 1
的所有可能的超集列舉爲C
套和存儲回C
的RDD。
根據在此示例中通過隨機生成器的輸出條件證明的標準,枚舉中排除了一些組。
from pyspark import SparkConf
from pyspark import SparkContext
from bitarray import bitarray
import random
def setadd(u, i):
r = u.copy()
r[i] = 1
return r
def stringToBit(u):
r = bitarray()
r.frombytes(u)
return r
def mapFunc(it):
global bdTH
global bdN
for s in it:
s = stringToBit(s[0])
print(s)
r = random.randint(1, 10)
# elimination criteria
if r < bdTH.value:
continue
xmax = n - 1
while not s[xmax]:
xmax -= 1
for x in xrange(xmax + 1, bdN.value):
if s[x]:
continue
ns = setadd(s, x)
yield (ns.tobytes(), 0)
def main(sc, n):
phi = bitarray('0') * n
C = [(setadd(phi, x).tobytes(), 0) for x in xrange(n)]
print(C)
C = sc.parallelize(C)
global bdN
bdN = sc.broadcast(n)
global bdTH
bdTH = sc.broadcast(random.randint(1, 10))
l = 1
while l <= n:
C = C.partitionBy(100)\
.mapPartitions(mapFunc)
l += 1
if C.count():
print('count: ' + str(C.count()))
else:
print('count: 0')
bdTH = sc.broadcast(random.randint(1, 10))
if __name__ == "__main__":
conf = SparkConf()
conf = conf.setAppName("test")
sc = SparkContext(conf = conf)
n = 5
main(sc, n)
sc.stop()
問題: 1.由於代碼確實是保證不評價任何兩次集。但是,輸出確實表明某些集合被評估了兩次。
2.變量bdTH
的broadcast
是否保證只有在爲迭代l
生成了C
或者Spark可以執行一些優化之後才發送。
你可以在項目符號列表中清除你的語言嗎? –