給定以下的Apache火花(Python)的碼(它是工作):apache如何在mapPartitions的spark中分配任務?
import sys
from random import random
from operator import add
import sqlite3
from datetime import date
from datetime import datetime
from pyspark import SparkContext
def agePartition(recs):
gconn = sqlite3.connect('/home/chris/test.db')
myc = gconn.cursor()
today = date.today()
return_part = []
for rec in recs:
sql = "select birth_date from peeps where name = '{n}'".format(n=rec[0])
myc.execute(sql)
bdrec = myc.fetchone()
born = datetime.strptime(bdrec[0], '%Y-%m-%d')
return_part.append((rec[0], today.year - born.year - ((today.month, today.day) < (born.month, born.day))))
gconn.close()
return iter(return_part)
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
sc = SparkContext(appName="PythonDBTEST")
print('starting...')
data = [('Chris', 1), ('Amanda', 2), ('Shiloh', 2), ('Sammy', 2), ('Tim', 1)]
rdd = sc.parallelize(data,5)
rslt_collect = rdd.mapPartitions(agePartition).collect()
for x in rslt_collect:
print("{n} is {a}".format(n=x[0], a=x[1]))
sc.stop()
在兩個計算/從節點設置共8個CPU將每個分區被創建爲一個任務,並分配到2個節點,以便所有5個分區並行運行?如果沒有,還需要做些什麼才能確保發生?
此處的目的是測試如何在每個從屬工作進程中保持全局數據庫連接處於活動狀態,以便不必爲RDD中處理的每個記錄重新打開數據庫連接。我在這個例子中使用SQLite,但它將是一個SQLCipher數據庫,並且在數據庫連接上打開需要更多的時間。
有沒有這樣做的原因,而不是使用'JdbcRDD'或類似的東西? – climbage
簡短的回答是肯定的。如前所述,我已經知道它的工作原理,並且打算保持db連接處於打開狀態,而不是爲每個要處理的記錄打開一個新連接(假設我在初始RDD中有超過五條記錄)。現有的non-spark進程在單臺計算機上並行運行,並將要處理的數據分解爲每個CPU的列表,併爲每個CPU打開一個連接,然後處理該列表。如果必須爲每條記錄打開一個連接,那麼它將會非常緩慢...... –
...我發佈的這段代碼在火花上下文中解決了這個問題,但我試圖瞭解每個分區如何處理。 –