2017-10-11 51 views
2

我在名爲part-0001part-0002等的Linux機器上的單個目錄中有大約200個文件。每個行擁有大約一百萬行相同的列(稱爲'a','b'等等)。讓'a','b'作爲每行的關鍵字(包含許多重複項)。並行化在pyspark中的Spark數據幀組

同時,我建立了一個Spark主機和兩個從機的Spark 2.2.0羣集,共有42個內核可用。地址是spark://XXX.YYY.com:7077

然後,我使用PySpark連接到羣集,並按如下方式計算每個唯一對的200個文件的計數。

from pyspark import SparkContext 
from pyspark.sql import SQLContext 
import pandas as pd 

sc = SparkContext("spark://XXX.YYY.com:7077") 
sqlContext = SQLContext(sc) 

data_path = "/location/to/my/data/part-*" 
sparkdf = sqlContext.read.csv(path=data_path, header=True) 
dfgrouped = sparkdf.groupBy(['a','b']) 
counts_by_group = dfgrouped.count() 

這樣做的效果是,我可以看到Spark在一系列消息中前進,它確實返回看似合理的結果。

問題:雖然正在執行此計算,但top並未顯示任何證據表明從屬內核正在執行任何操作。似乎沒有任何並行化。每個從機都有一個在作業之前存在的單個相關Java進程(以及來自其他用戶和後臺系統進程的進程)。所以看起來主人正在做所有的工作。鑑於有200個奇怪的文件,我預計會看到21個進程在每個從機上運行,​​直到事情結束(這個我看到當我在一個單獨的實現中明確調用parallelize時,如下count = sc.parallelize(c=range(1, niters + 1), numSlices=ncores).map(f).reduce(add))。

問題:如何確保Spark實際上並行計數?我希望每個核心都能抓取一個或多個文件,對它在文件中看到的配對進行計數,然後將各個結果縮減爲一個DataFrame。我不應該在頂部看到這個嗎?我是否需要明確調用並行化?

(FWIW,我所看到的例子使用分區,但我的理解是,這是用來在文件的數據塊分配處理。我的情況是,我有很多的文件。)

謝謝提前。

回答

1

TL; DR還有可能是您的部署沒有問題。

我希望看到21個進程運行

除非你專門配置的火花,使用每個JVM執行單核,沒有理由要做到這一點。與RDD不同,您在問題中已經提到過DataFrame API根本不使用Python工作者,Python UserDefinedFunctions除外。與此同時,JVM執行程序使用線程而不是完整的系統進程(PySpark使用後者來避免GIL)。此外,在獨立模式下的默認spark.executor.cores等於the available cores on the worker的數量。因此,如果沒有額外的配置,您應該看到兩個執行器JVM,每個執行器使用21個數據處理線程。

總的來說,你應該檢查Spark UI,如果你看到任務分配給執行者,一切都應該沒問題。