我有一個需要用相同的算法處理數百萬行的PostgreSQL表。 我正在使用Python和SQLAlchemy.Core執行此任務。PostgreSQL表的分佈式處理
該算法接受一個或多個行作爲輸入,並返回具有某些更新值的相同數量的行。
id1, id2, NULL, NULL, NULL -> id1, id2, value1, value2, value3
id1, id3, NULL, NULL, NULL -> id1, id3, value4, value5, value6
id2, id3, NULL, NULL, NULL -> id2, id3, value7, value8, value9
...
id_n, id_m, NULL, NULL, NULL -> id_n, id_m, value_xxx, value_yyy, value_zzz
我正在使用PC羣集來執行此任務。此羣集運行dask.distributed
調度程序和工作人員。
我想,這個任務可以用map
函數有效實現。我的想法是,每個工作人員查詢數據庫,選擇用NULL值處理一些行,然後用結果更新它們。
我的問題是:如何編寫SQL查詢,這將允許在工作人員之間分配表的片斷?
我試圖定義行的子集,以便在SQL查詢offset
和limit
每個工人,每個工人發出:
SQL:
select * from table where value1 is NULL offset N limit 100;
...
update table where id1 = ... and id2 = ...
set value1 = value...;
的Python:
from sqlalchemy import create_engine, bindparam, select, func
from distributed import Executor, progress
def process(offset, limit):
engine = create_engine(...)
# get next piece of work
query = select(...).where(...).limit(limit).offset(offset)
rows = engine.execute([select]).fetchall()
# process rows
# submit values to table
update_stmt = table.update().where(...).where(...).values(...)
up_values = ...
engine.execute(update_stmt, up_values)
if __name__ == '__main__':
e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
port=config('SERVER_PORT')))
n_rows = count_rows_to_process()
chunk_size = 100
progress(e.map(process, range(0, n_rows, chunk_size)))
但是,這沒有奏效。
的range
函數返回偏移列表計算已經開始之前,以及map
功能開始process
功能之前分發給工人。
然後一些工人已經成功地完成了處理他們的工作塊,將他們的結果提交給表格,並且更新了值。
然後新的迭代開始,新的SELECT ...WHERE value1 is NULL LIMIT 100 OFFSET ...
查詢被髮送到數據庫,但是現在偏移量是無效的,因爲它是在先前的工作人員更新表之前計算的。現在減少了NULL值的數量,並且工作人員可以從數據庫中接收空集。
在開始計算之前,我不能使用一個SELECT
查詢,因爲它會返回不適合RAM的巨大表。
SQLAlchemy手冊還指出,對於分佈式處理,引擎實例應該在本地爲每個python進程創建。因此,我無法查詢數據庫一次並將返回的光標發送到process
函數。
因此,解決方案是正確構建SQL查詢。
你確定你的算法會受益於分佈式處理嗎?它是否顯着受CPU限制?通常情況下,如果使用單個進程,那麼數據集中的重要部分(即使不是全部)也會佔用大量的內存(甚至達到數百萬行的數量級),而如果使用單個進程,則會因爲開銷而導致分配速度更快。無論如何,不要使用'LIMIT'和'OFFSET'(這是因爲它必須跳過'OFFSET'行而緩慢),'ORDER BY'主鍵(假設它是'(id1,id2)')並且執行'(id1 ,id2)BETWEEN(1,2)和(3,4)'。 – univerio
我沒有看到任何替代分佈式處理我的情況。該算法在C中實現,並編譯爲PC和ARM的二進制可執行文件,這些文件是從python調用的。 – wl2776