2012-10-30 26 views
0

我有工人和任務做:如何在Python中將工作公平分配給工作人員? - 分裂迭代成同樣大小的塊

workers = ['peter', 'paul', 'mary'] 
tasks = range(13) 

現在我想將任務分割成塊或工作的批次,所以每個工人可以在一個批處理工作,並造成大約與其他人相同的工作量。在我的現實生活中,我想將批處理作業安排到計算場。批處理作業應該並行運行。實際的時間表&調度是由一個商業級的工具,如LSF或網格。

什麼,我會想到一些例子:

>>> distribute_work(['peter', 'paul', 'mary'], range(3)) 
[('peter', [0]), ('paul', [1]), ('mary', [2])] 
>>> distribute_work(['peter', 'paul', 'mary'], range(6)) 
[('peter', [0, 3]), ('paul', [1, 4]), ('mary', [2, 5])] 
>>> distribute_work(['peter', 'paul', 'mary'], range(5)) 
[('peter', [0, 3]), ('paul', [1, 4]), ('mary', [2])] 

這個問題是非常相似的問題hereherehere

不同的是,我想這些功能,在訂單或優先權:

  1. 沒有使用len,如果可能的話,不在內部建立長數據結構
  2. 接受發電機
  3. 返回發電機
  4. 儘可能多使用STDLIB組件儘可能

一些旁註上要求:

  • 故意不類型的字典:我工人可以執行多個批處理的相同名稱(unix主機名)。如果你的解決方案使用了字典,那很好,因爲我們總是可以通過批量枚舉來查找工作人員。
  • 任意長度:工作人員和任務都可以是任意長度大於等於1的迭代次數。並且他們不必像上面的示例中所示的那樣均勻地分割,其中Mary只獲取一個任務。
  • 排序:對我來說並不重要。我猜其他人可能會喜歡[0,1],[2,3],[5]這樣的順序,但我不在乎。如果您的解決方案可以保留或切換訂單,那麼也許值得向其他人指出。

我試圖總結我的周圍itertools頭,這方面的問題,並與下面的代碼走過來說明這個問題:

from itertools import * 

def distribute_work(workers, tasks): 
    batches = range(len(workers)) 
    return [ (workers[k], 
       [t[1] for t in i] 
       ) for (k,i) in groupby(sorted(zip(cycle(batches), 
                tasks), 
               key=lambda t: t[0]), 
             lambda t: t[0]) ] 

這滿足4,但排序很可能違反了1 ..和2./3。甚至都沒有想過。

也許有一些簡單的解決方案,以我沒有想到的方式組合一些stdlib組件。但也許不是。任何接受者?

回答

1

您是否需要預批次?

爲什麼不只是有一個隊列,並讓每個工人在完成工作單位時從隊列中彈出?

+0

好點。必須澄清的是,這是關於應該爲需要並行運行的機器安排作業。整個工作量正在減輕工作量,以減少從開始到結果的延遲。 – cfi

1

Tyler's answer

def doleOut(queue, workers): 
    for worker,task in itertools.izip(itertools.cycle(workers),queue): 
     yield worker,task 

這將繼續,只要有一個隊列返回(worker, task)元組。所以,如果你有一個阻擋waitForMoreWork你可以這樣做:

queue = [] 
doler = distribute_work(workers, queue) 
while 1: 
    queue.append(waitForMoreWork) 
    currentqueuelen = len(queue) 
    for i in range(0,queuelen): 
     worker,item = doler.next() 
     worker.passitem(item) 

這樣就會阻塞,直到有更多的隊列中的項目,然後分發這些,然後再次阻止。您可以設置您的waitForMoreWork表達式,以便一次發出儘可能多的項目。

+0

對於Python 3,請用'zip'替換'itertools.izip'。 – cfi

+0

不幸的是,這是我有一半的解決方案 - 儘管更簡單和更好。這隻產生一個發電機,我需要每個工作人員一個。 – cfi

+1

@cfi:除非您有一種方法分配負載,否則您將如何正確分配它們?唯一的方法是枚舉它們並使用模或某些值,但某處您必須提供枚舉... –

1

我想你想用multiprocessing.Pool.imap來處理你的工人和分配他們的工作。我相信它會做你想做的一切。

jobs = (some generator)     # can consume jobs from a generator 
pool = multiprocessing.Pool(3)   # set number of workers here 
results = pool.imap(process_job, jobs) # returns a generator 

for r in results:       # loop will block until results arrive 
    do_something(r) 

如果結果順序對您的應用程序無關緊要,您也可以使用imap_unordered

+0

嗯。如果我用Python做這個工作派遣,我想可以。這不是真的回答了這個問題,但它可能很好地解決了我的整體問題。我還沒打算在同一個python腳本中編寫實際的作業調度,可能不想這樣做。整個系統更加複雜,實際的工作調度目前發生在所有配置已經可用的shell腳本中。我只是試圖破解Python中的十個班輪來處理排序/分配問題。如果我讓'def process_job'創建/打印shell命令,這可能仍然有效。 – cfi

+1

@cfi:嗯,我不知道我明白。如果你不想從一開始就完全使用'jobs'生成器(就像你在代碼中做的那樣),你將需要讓Python控制工作者和作業提供者之間的同步(至少像'multiprocessing.Queue')。如果你要走這條路線,我會讓'multiprocessing'模塊儘可能多地處理它,而不是自己重新發明'Pool'類。但是,如果你更多地解釋你的系統架構,我們可以想出其他的東西嗎? – Blckknght

0

好吧,說它不可能,這是一個想法。也許這是我應該轉向codereview的東西 - 我非常感興趣的評論這種內存開銷多少。換句話說,我不知道這是否真的解決了任務列表很長且尺寸未知的問題。 As Blckknght mentioned multiprocessing might be the better alternative

代碼:

import itertools 

def distribute_work(workers, tasks): 
    """Return one generator per worker with a fair share of tasks 

    Task may be an arbitrary length generator. 
    Workers should be an iterable. 
    """ 
    worker_count = len(workers) 
    worker_ids = range(worker_count) 
    all_tasks_for_all_workers = itertools.tee(tasks, worker_count) 
    assignments = [ (workers[id], itertools.islice(i, id, None, worker_count)) 
        for (id,i) in enumerate(all_tasks_for_all_workers) ]  
    return(assignments) 

的算法是

  1. 複製原始任務列表中一次爲每個工人。由於這只是複製生成器對象,所以它應該與內存中任務列表的大小無關。即使這是一個相對昂貴的操作,對於非常大的任務列表來說,這只是一次啓動成本,對內存來說也是微不足道的。
  2. 要將任務分配給一個工作人員,每個工作人員必須獲取任務列表的一部分。如果#W是工人的數量,第一個工人需要工作0#W2*#W3*#W等。第二工人需要0+1#W+12*#W+13*#W+1等每個工人的拼接可以itertools.islice
完成

對於純粹的拆分/任務分配,工作人員的名字並不是真正需要這個功能。但工人的數量是。改變它可以使函數更加通用和有用,並使返回值更易於理解。爲了回答我自己的問題,我將按原樣離開該功能。

用法及結果:

>>> for (worker,tasks) in distribute_work(['peter', 'paul', 'mary'], range(5)): 
... print(worker, list(tasks)) 
... 
peter [0, 3] 
paul [1, 4] 
mary [2] 

而且它也能處理那裏的工人具有相同的名稱,但不同的實體的情況下:

>>> for (worker,tasks) in distribute_work(['p', 'p', 'mary'], range(5)): 
... print(worker, list(tasks)) 
... 
p [0, 3] 
p [1, 4] 
mary [2] 
0

這裏有一個方法,我想:

parallelism = os.cpu_count() 
num_todos = len(todos) 

# this zip fanciness makes each chunk stripe through the data sequentially overall so that the 
# first items still get done first across all the workers 
chunksize = math.ceil(num_todos/parallelism) 
chunks = list(itertools.zip_longest(*[todos[i:i+chunksize] for i in range(0, num_todos, chunksize)])) 
chunks = [[c for c in chunk if c is not None] for chunk in chunks] 

with Pool(processes=parallelism) as pool: 
    tasks = [pool.apply_async(my_function, args=(chunk)) for chunk in chunks] 
    [task.get() for task in tasks] 

取決於你是否需要累積結果,你可以調整,但有趣的參數對我來說,工作人員正在協作,以全局順序完成任務(在我的情況下,處理連續的圖像幀,以便我可以看到事物看起來如何,因爲所有的cpus都在起動)。