2017-07-21 112 views
1

我有一個調用LogisticRegressionWithLBFGS x次的迭代。PySpark:LogisticRegressionWithLBFGS在迭代中變得越來越慢

問題是,迭代每個循環都變得越來越慢,最後永遠掛起。

我嘗試了很多不同的方法,但到目前爲止沒有運氣。

的代碼看起來像這樣:

def getBootsrapedAttribution(iNumberOfSamples, df): 

    def parsePoint(line): 
     return LabeledPoint(line[2], line[3:]) 

    aResults = {} 
    while x <= iNumberOfSamples: 
     print ("## Sample: " + str(x)) 
     a = datetime.datetime.now() 
     dfSample = sampleData(df) 
     dfSample.repartition(700) 
     parsedData = dfSample.rdd.map(parsePoint) 
     parsedData = parsedData.repartition(700) 
     parsedData.persist() 
     model = LogisticRegressionWithLBFGS.train(parsedData) 
     parsedData.unpersist() 
     b = datetime.datetime.now() 
     print(b-a) 
     x+=1 

def sampleData(df): 
    df = df.repartition(500) 
    dfFalse = df.filter('col == 0').sample(False, 0.00035) 
    dfTrue = df.filter('col == 1') 
    dfSample = dfTrue.unionAll(dfFalse) 
    return dfSample 


getBootsrapedAttribution(50, df) 

和輸出看起來是這樣的:

## Sample: 1 
0:00:44.393886 

## Sample: 2 
0:00:28.403687 

## Sample: 3 
0:00:30.884087 

## Sample: 4 
0:00:33.523481 

## Sample: 5 
0:00:36.107836 

## Sample: 6 
0:00:37.077169 

## Sample: 7 
0:00:41.160941 

## Sample: 8 
0:00:54.768870 

## Sample: 9 
0:01:01.31139 

## Sample: 10 
0:00:59.326750 

## Sample: 11 
0:01:37.222967 

## Sample: 12 

...hangs forever 

沒有model = LogisticRegressionWithLBFGS.train(parsedData)它在運行時的性能問題。

我的集羣看起來是這樣的:

spark.default.parallelism 500 
spark.driver.maxResultSize 20G 
spark.driver.memory 200G 
spark.executor.cores 32 
spark.executor.instances 2 
spark.executor.memory 124G 

有誰知道這個問題?

回答

1

我在回答我自己的問題。

問題出在方法LogisticRegressionWithLBFGS。用LogisticRegression代替Spark 2.1+解決了這個問題。每次迭代不會再減慢。

此外,還可以使用上面的代碼進行更多的改進。 rdd方法sample可以替換爲DataFrame方法sampleBy。這也將避免不必要union

.sampleBy('col', fractions={0: 0.00035, 1: 1}, seed=1234)

此外,所有在上面的代碼中的repartitions是不必要的。重要的是,df傳遞給getBootsrapedAttribution()分好,cached