2016-08-29 35 views
0

我有一個需要用相同的算法處理數百萬行的PostgreSQL表。 我正在使用Python和SQLAlchemy.Core執行此任務。PostgreSQL表的分佈式處理

該算法接受一個或多個行作爲輸入,並返回具有某些更新值的相同數量的行。

id1, id2, NULL, NULL, NULL -> id1, id2, value1, value2, value3 
id1, id3, NULL, NULL, NULL -> id1, id3, value4, value5, value6 
id2, id3, NULL, NULL, NULL -> id2, id3, value7, value8, value9 
... 
id_n, id_m, NULL, NULL, NULL -> id_n, id_m, value_xxx, value_yyy, value_zzz 

我正在使用PC羣集來執行此任務。此羣集運行dask.distributed調度程序和工作人員。

我想,這個任務可以用map函數有效實現。我的想法是,每個工作人員查詢數據庫,選擇用NULL值處理一些行,然後用結果更新它們。

我的問題是:如何編寫SQL查詢,這將允許在工作人員之間分配表的片斷?

我試圖定義行的子集,以便在SQL查詢offsetlimit每個工人,每個工人發出:

SQL:

select * from table where value1 is NULL offset N limit 100; 
... 
update table where id1 = ... and id2 = ... 
     set value1 = value...; 

的Python:

from sqlalchemy import create_engine, bindparam, select, func 
from distributed import Executor, progress 

def process(offset, limit): 
    engine = create_engine(...) 

    # get next piece of work 
    query = select(...).where(...).limit(limit).offset(offset) 

    rows = engine.execute([select]).fetchall() 
    # process rows 

    # submit values to table 
    update_stmt = table.update().where(...).where(...).values(...) 
    up_values = ... 
    engine.execute(update_stmt, up_values) 

if __name__ == '__main__': 
    e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'), 
             port=config('SERVER_PORT'))) 
    n_rows = count_rows_to_process() 
    chunk_size = 100 
    progress(e.map(process, range(0, n_rows, chunk_size))) 

但是,這沒有奏效。

range函數返回偏移列表計算已經開始之前,以及map功能開始process功能之前分發給工人。

然後一些工人已經成功地完成了處理他們的工作塊,將他們的結果提交給表格,並且更新了值。

然後新的迭代開始,新的SELECT ...WHERE value1 is NULL LIMIT 100 OFFSET ...查詢被髮送到數據庫,但是現在偏移量是無效的,因爲它是在先前的工作人員更新表之前計算的。現在減少了NULL值的數量,並且工作人員可以從數據庫中接收空集。

在開始計算之前,我不能使用一個SELECT查詢,因爲它會返回不適合RAM的巨大表。

SQLAlchemy手冊還指出,對於分佈式處理,引擎實例應該在本地爲每個python進程創建。因此,我無法查詢數據庫一次並將返回的光標發送到process函數。

因此,解決方案是正確構建SQL查詢。

+1

你確定你的算法會受益於分佈式處理嗎?它是否顯着受CPU限制?通常情況下,如果使用單個進程,那麼數據集中的重要部分(即使不是全部)也會佔用大量的內存(甚至達到數百萬行的數量級),而如果使用單個進程,則會因爲開銷而導致分配速度更快。無論如何,不​​要使用'LIMIT'和'OFFSET'(這是因爲它必須跳過'OFFSET'行而緩慢),'ORDER BY'主鍵(假設它是'(id1,id2)')並且執行'(id1 ,id2)BETWEEN(1,2)和(3,4)'。 – univerio

+0

我沒有看到任何替代分佈式處理我的情況。該算法在C中實現,並編譯爲PC和ARM的二進制可執行文件,這些文件是從python調用的。 – wl2776

回答

1

一種選擇要考慮的是隨機:

SELECT * 
FROM table 
WHERE value1 IS NULL 
ORDER BY random() 
LIMIT 100; 

在最壞的情況下,你將有幾個工人計算並行同樣的事情。如果它不打擾你,這是最簡單的方法之一。

另一種選擇是各行專爲特定的工人:

UPDATE table 
SET value1 = -9999 
WHERE id IN (
    SELECT id 
    FROM table 
    WHERE value1 IS NULL 
    ORDER BY random() 
    LIMIT 100 
) RETURNING * ; 

這樣你「標記」的您的特定工作人員已經「採取了」以-9999行。所有其他工作人員將跳過這些行,因爲value1不再是NULL。這裏的風險是,如果工作失敗,你不會有一個簡單的方法來回到這些行 - 你必須手動將它們更新回NULL。

+0

你的第二個選擇今天早上出現在我的腦海裏 – wl2776