2016-01-15 81 views
0

給定以下的Apache火花(Python)的碼(它是工作):apache如何在mapPartitions的spark中分配任務?

import sys 
from random import random 
from operator import add 
import sqlite3 
from datetime import date 
from datetime import datetime 

from pyspark import SparkContext 


def agePartition(recs): 
    gconn = sqlite3.connect('/home/chris/test.db') 
    myc = gconn.cursor() 
    today = date.today() 
    return_part = [] 
    for rec in recs: 
     sql = "select birth_date from peeps where name = '{n}'".format(n=rec[0]) 
     myc.execute(sql) 
     bdrec = myc.fetchone() 
     born = datetime.strptime(bdrec[0], '%Y-%m-%d') 
     return_part.append((rec[0], today.year - born.year - ((today.month, today.day) < (born.month, born.day)))) 
    gconn.close() 
    return iter(return_part) 

if __name__ == "__main__": 
    """ 
     Usage: pi [partitions] 
    """ 
    sc = SparkContext(appName="PythonDBTEST") 
    print('starting...') 
    data = [('Chris', 1), ('Amanda', 2), ('Shiloh', 2), ('Sammy', 2), ('Tim', 1)] 
    rdd = sc.parallelize(data,5) 
    rslt_collect = rdd.mapPartitions(agePartition).collect() 

    for x in rslt_collect: 
     print("{n} is {a}".format(n=x[0], a=x[1])) 

    sc.stop() 

在兩個計算/從節點設置共8個CPU將每個分區被創建爲一個任務,並分配到2個節點,以便所有5個分區並行運行?如果沒有,還需要做些什麼才能確保發生?

此處的目的是測試如何在每個從屬工作進程中保持全局數據庫連接處於活動狀態,以便不必爲RDD中處理的每個記錄重新打開數據庫連接。我在這個例子中使用SQLite,但它將是一個SQLCipher數據庫,並且在數據庫連接上打開需要更多的時間。

+0

有沒有這樣做的原因,而不是使用'JdbcRDD'或類似的東西? – climbage

+0

簡短的回答是肯定的。如前所述,我已經知道它的工作原理,並且打算保持db連接處於打開狀態,而不是爲每個要處理的記錄打開一個新連接(假設我在初始RDD中有超過五條記錄)。現有的non-spark進程在單臺計算機上並行運行,並將要處理的數據分解爲每個CPU的列表,併爲每個CPU打開一個連接,然後處理該列表。如果必須爲每條記錄打開一個連接,那麼它將會非常緩慢...... –

+0

...我發佈的這段代碼在火花上下文中解決了這個問題,但我試圖瞭解每個分區如何處理。 –

回答

1

假設羣集中有8個可用插槽(cpus)。您最多可以同時處理8個分區。在你的情況下,你有5個分區,所以它們都應該被並行處理。這將是5個到數據庫的併發連接。

我的期望是每核心一個,所以如果記錄數量大得多,我就不會不斷地重新創建數據庫連接。

就你而言,它將是每個分區。如果你有20個分區和8個核心,你仍然會創建連接20次。

+0

這是有道理的;謝謝你的解釋。 –