我在名爲part-0001
,part-0002
等的Linux機器上的單個目錄中有大約200個文件。每個行擁有大約一百萬行相同的列(稱爲'a','b'等等)。讓'a','b'作爲每行的關鍵字(包含許多重複項)。並行化在pyspark中的Spark數據幀組
同時,我建立了一個Spark主機和兩個從機的Spark 2.2.0羣集,共有42個內核可用。地址是spark://XXX.YYY.com:7077
。
然後,我使用PySpark連接到羣集,並按如下方式計算每個唯一對的200個文件的計數。
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext("spark://XXX.YYY.com:7077")
sqlContext = SQLContext(sc)
data_path = "/location/to/my/data/part-*"
sparkdf = sqlContext.read.csv(path=data_path, header=True)
dfgrouped = sparkdf.groupBy(['a','b'])
counts_by_group = dfgrouped.count()
這樣做的效果是,我可以看到Spark在一系列消息中前進,它確實返回看似合理的結果。
問題:雖然正在執行此計算,但top並未顯示任何證據表明從屬內核正在執行任何操作。似乎沒有任何並行化。每個從機都有一個在作業之前存在的單個相關Java進程(以及來自其他用戶和後臺系統進程的進程)。所以看起來主人正在做所有的工作。鑑於有200個奇怪的文件,我預計會看到21個進程在每個從機上運行,直到事情結束(這個是我看到當我在一個單獨的實現中明確調用parallelize
時,如下count = sc.parallelize(c=range(1, niters + 1), numSlices=ncores).map(f).reduce(add)
)。
問題:如何確保Spark實際上並行計數?我希望每個核心都能抓取一個或多個文件,對它在文件中看到的配對進行計數,然後將各個結果縮減爲一個DataFrame
。我不應該在頂部看到這個嗎?我是否需要明確調用並行化?
(FWIW,我所看到的例子使用分區,但我的理解是,這是用來在單文件的數據塊分配處理。我的情況是,我有很多的文件。)
謝謝提前。