2017-08-31 17 views
0

我正在嘗試爲涉及讀取大量文件並分析它們的任務創建工作人員。爲所有進程存儲只讀字符串數組的多處理

我想是這樣的:

list_of_unique_keys_from_csv_file = [] # About 200mb array (10m rows) 
# a list of uniquekeys for comparing inside worker processes to a set of flatfiles 

我需要更多的線程,因爲它會很慢,做一個過程(每個文件10分鐘)的比較。

我有另一套平面文件,我比較CSV文件,以查看是否存在唯一鍵。這看起來像是一個地圖減少類型的問題。

main.py:

def worker_process(directory_glob_of_flat_files, list_of_unique_keys_from_csv_file): 
    # Do some parallel comparisons "if not in " type stuff. 
    # generate an array of 
    # lines of text like : "this item_x was not detected in CSV list (from current_flatfile)" 
    if current_item not in list_of_unique_keys_from_csv_file: 
    all_lines_this_worker_generated.append(sometext + current_item) 
    return all_lines_this_worker_generated 




def main(): 
    all_results = [] 
    pool = Pool(processes=6) 
    partitioned_flat_files = [] # divide files from glob by 6 
    results = pool.starmap(worker_process, partitioned_flat_files, {{{{i wanna pass in my read-only parameter}}}}) 
    pool.close() 
    pool.join() 

    all_results.extend(results) 
    resulting_file.write(all_results) 

我同時使用Linux和Windows環境中,所以也許我需要的東西跨平臺兼容(整個fork()的討論)。

主要問題:我是否需要某種管道或隊列,似乎無法找到一個很好的示例,說明如何在每個工作進程的大型只讀字符串數組中傳輸副本?

回答

1

您可以分開您的只讀參數,然後將它們傳入。multiprocessing模塊是跨平臺兼容的,所以不用擔心它。實際上,每個進程,甚至是子進程都有自己的資源,這意味着無論你如何傳遞參數,它都會保留原始文件的副本,而不是共享它。在這種簡單的情況下,當您將參數從主進程傳遞到子進程時,Pool會自動創建變量的副本。因爲子進程只有原始副本的副本,所以修改不能共享。在這種情況下無關緊要,因爲您的變量是隻讀的。

但要小心你的代碼,你需要用你需要到 迭代集合的參數,例如:

def add(a, b): 
    return a + b 

pool = Pool() 
results = pool.starmap(add, [(1, 2), (3, 4)]) 
print(results) 
# [3, 7] 
+0

是啊,但我在的主要問題是,每個進程/線程一切,需要獲取SAME全局只讀'list_of_unique_keys_from_csv_file'數組......但是,當我試圖打印出我們在進程內部得到的內容時,它似乎已經分塊/分割了數組或其他東西。這不是我想要的。我想要在所有線程/所有進程中複製ORIGINAL陣列。也許我需要讀取每個worker中的主文件,但是我覺得讓200名工作人員在啓動時讀取完全相同的文件會很愚蠢(所以我的主線程讀取它然後將數組複製到全部) – Dexter

+0

@Dexter這很容易,'results = pool.starmap(func,[(one_file,list_of_unique_keys_from_csv_file)for one_file in partitioned_flat_files])' – Sraw

+0

是的工作,我想知道爲什麼itertools的方式不起作用。謝謝。 – Dexter

相關問題