2009-01-21 84 views
42

我有一個200個命令的列表/隊列,我需要在Linux服務器的shell中運行。從Linux上的命令隊列並行處理(bash,python,ruby ... whatever)

我只希望一次運行最多10個進程(從隊列中)。一些過程需要幾秒鐘才能完成,其他過程需要更長的時間。

當一個進程完成時,我希望下一個命令從隊列中「彈出」並執行。

有沒有人有解決這個問題的代碼?

進一步闡述:

有200件作品需要做的,在某種隊列。我最多隻想做10件作品。當一個線程完成一項工作時,它應該向隊列請求下一項工作。如果隊列中沒有更多的工作,線程就會死掉。當所有線程都已經死亡時,意味着所有的工作都已完成。

我試圖解決的實際問題是使用imapsync將200個郵箱從舊郵件服務器同步到新郵件服務器。某些用戶擁有大量郵箱並需要很長時間才能同步,其他用戶的郵箱非常小,並且可以快速同步。

回答

38

我想你可以使用make和make -j xx命令來做到這一點。

也許這樣

all : usera userb userc.... 

usera: 
     imapsync usera 
userb: 
     imapsync userb 
.... 

化妝-j 10 -f生成文件生成文件

+1

這個工作正如我所希望的那樣。我寫了一些代碼來生成Makefile。它結束了超過1000行。謝謝! – mlambie 2009-01-21 07:01:36

+1

+1這確實很聰明! – progo 2011-05-15 17:56:58

-3

您能詳細說明您的意思嗎並行?這聽起來像你需要在隊列中實現某種鎖定,所以你的輸入不會被選中兩次,等等,這些命令只運行一次。

大多數隊列系統作弊 - 他們只是寫一個巨大的待辦事項列表,然後選擇例如十個項目,工作,並選擇下十個項目。沒有並行化。

如果您提供更多詳細信息,我相信我們可以幫助您。

7

GNU make(也可能是其他的實現)具有-j參數,它控制一次運行多少個作業。當一份工作完成時,make會開始另一個。

4

好吧,如果他們在很大程度上是相互獨立的,我倒是覺得在以下方面:

Initialize an array of jobs pending (queue, ...) - 200 entries 
Initialize an array of jobs running - empty 

while (jobs still pending and queue of jobs running still has space) 
    take a job off the pending queue 
    launch it in background 
    if (queue of jobs running is full) 
     wait for a job to finish 
     remove from jobs running queue 
while (queue of jobs is not empty) 
    wait for job to finish 
    remove from jobs running queue 

注意的是,在主迴路尾測試意味着,如果「工作運行隊列」有空間while循環迭代時 - 防止循環的提前終止。我認爲這個邏輯是正確的。

我可以很容易地看到如何在C中做到這一點 - 它在Perl中也不會那麼困難(因此在其他腳本語言(Python,Ruby,Tcl等)中並不太難)。我完全不確定我是否想要在shell中執行它 - shell中的wait命令會等待所有孩子終止,而不是讓某個孩子終止。

+0

您也可以對特定的子進程使用wait命令。可以給出任意數量的參數,其中每個參數都可以是一個PID或作業ID。 – 2013-10-01 14:56:55

+1

@BrianMinton:你說得對,你可以用`wait`列出特定的PID,但你仍然可以得到'所有人都死了'的行爲,而不是'第一個死亡'這是這個代碼真正需要的。 – 2013-10-01 15:10:30

42

在shell上,可以使用xargs對並行命令進行排隊處理。例如,對於具有3個總是睡並聯,睡眠對每個1秒,和在總執行10點睡覺做

echo {1..10} | xargs -d ' ' -n1 -P3 sh -c 'sleep 1s' _ 

而且,它還將睡總共4秒。如果您有名稱的列表,並想通過名稱來執行,再執行並行3個指令命令,請

cat names | xargs -n1 -P3 process_name 

將執行命令process_name aliceprocess_name bob等。

+0

哇,我一直使用`xargs`,從來沒有料到它會有這個選項! – Warrick 2013-07-19 12:00:33

+0

對於第二個示例,您如何修改命令以使`process_name`可以使用多個參數?我想要做這樣的事情:`cat commands.txt | xargs -n1 -P3 eval`其中`commands.txt`中有一堆命令(每行一個,每個都有多個參數)。問題是`eval`不起作用,因爲它是一個shell內置命令 – Eddy 2015-07-23 10:12:33

3

在Python中,你可以嘗試:

import Queue, os, threading 

# synchronised queue 
queue = Queue.Queue(0) # 0 means no maximum size 

# do stuff to initialise queue with strings 
# representing os commands 
queue.put('sleep 10') 
queue.put('echo Sleeping..') 
# etc 
# or use python to generate commands, e.g. 
# for username in ['joe', 'bob', 'fred']: 
# queue.put('imapsync %s' % username) 

def go(): 
    while True: 
    try: 
     # False here means no blocking: raise exception if queue empty 
     command = queue.get(False) 
     # Run command. python also has subprocess module which is more 
     # featureful but I am not very familiar with it. 
     # os.system is easy :-) 
     os.system(command) 
    except Queue.Empty: 
     return 

for i in range(10): # change this to run more/fewer threads 
    threading.Thread(target=go).start() 

未經測試...

(當然,python本身是單線程的,你仍然應該從wa中獲得多線程的好處儘管如此,IO也是如此。)

1

Python的multiprocessing module似乎很適合您的問題。這是一個支持按流程進行線程化的高級包。

13

對於這種工作PPSS是這樣寫的:並行處理shell腳本。谷歌這個名字,你會發現它,我不會linkpam。

24

Parallel是爲此目的而精心製作的。

cat userlist | parallel imapsync 

一個相對於其他的解決方案Parallel美女的是,它可以確保輸出不混合。在Paralleltraceroute例如正常工作:

(echo foss.org.my; echo www.debian.org; echo www.freenetproject.org) | parallel traceroute 
0

zsh中的簡單函數用於在不超過4個子殼中並行化作業,在/ tmp中使用鎖定文件。

唯一的非平凡部分是在第一測試中的glob國旗:

  • #q:使文件名在測試
  • [4]通配符:返回第四結果僅
  • N:忽略錯誤空結果

應該很容易將其轉換爲posix,儘管它會稍微冗長些。

不要忘記在\"的工作中逃避任何報價。

#!/bin/zsh 

setopt extendedglob 

para() { 
    lock=/tmp/para_$$_$((paracnt++)) 
    # sleep as long as the 4th lock file exists 
    until [[ -z /tmp/para_$$_*(#q[4]N) ]] { sleep 0.1 } 
    # Launch the job in a subshell 
    (touch $lock ; eval $* ; rm $lock) & 
    # Wait for subshell start and lock creation 
    until [[ -f $lock ]] { sleep 0.001 } 
} 

para "print A0; sleep 1; print Z0" 
para "print A1; sleep 2; print Z1" 
para "print A2; sleep 3; print Z2" 
para "print A3; sleep 4; print Z3" 
para "print A4; sleep 3; print Z4" 
para "print A5; sleep 2; print Z5" 

# wait for all subshells to terminate 
wait