2012-03-13 148 views
1

我用(想起Heroko工作線程,或EC2實例)M任務處理和N個並行處理資源,其中M >> N.產卵多個並行任務

我會推出自己的系統,但它似乎很可能已經有一個調試過的軟件包或gem:你推薦什麼? (現在我想想,我能折磨延遲::工作去做這個。)

任務可以幾乎任何語言編寫的 - 即使是一個shell腳本,將做的工作。 '母船'是帶有PostgreSQL數據庫的Ruby On Rails。其基本思想是當資源準備好處理任務時,它會要求母船在隊列中處理下一個未處理的任務並開始處理它。如果工作失敗,在放棄之前重新嘗試幾次。結果可以寫入平面文件或寫入PostgreSQL數據庫。

(而且,不,這不是生成的垃圾郵件。我研究幾大社交網絡的degree distribution

回答

2

我認爲這是一個delayed_job的https://github.com/collectiveidea/delayed_job或resque https://github.com/defunkt/resque工作,如你所說。

+0

同意。我不明白爲什麼這個解決方案會被視爲「受到折磨」。 – betamatt 2012-03-14 18:27:11

+0

我的意思是說,有很多細節需要注意,並超出了D :: J或Resque提供的工作流程,例如分派任務並處理錯誤。但這很可能是我將要走的路。 – 2012-03-14 22:26:07

+0

接受這個問題。儘管如此,我將使用亞馬遜的SQS作爲整個工作流程的一部分。 – 2012-03-24 13:00:45

2

這就要滾動您自己,但但如果你的並行任務沒有資源密集型的,這是一個相當快速的解決方案。另一方面,如果他們是資源密集型的,你會想要實現更強大的功能。

您可以用Process::fork開始每個線程(如果該進程是紅寶石),或Process::exec,或者Process::spawn(如果進程是別的東西)。然後使用Process::waitall來完成子流程。

下面,我用了一個Hash保持功能本身以及對PID的。這當然可以改善。

# define the sub-processes 
sleep_2_fail = lambda { sleep 2; exit -1; } 
sleep_2_pass = lambda { sleep 2; exit 0; } 
sleep_1_pass = lambda { sleep 1; exit 0; } 
sleep_3_fail = lambda { sleep 3; exit -1; } 

# use a hash to store the lambda's and their PID's 
sub_processes = Hash.new 

# add the sub_processes to the hash 
# key = PID 
# value = lambda (can use to be re-called later on) 
sub_processes.merge! ({ Process::fork { sleep_2_fail.call } => sleep_2_fail }) 
sub_processes.merge! ({ Process::fork { sleep_2_pass.call } => sleep_2_pass }) 
sub_processes.merge! ({ Process::fork { sleep_1_pass.call } => sleep_1_pass }) 
sub_processes.merge! ({ Process::fork { sleep_3_fail.call } => sleep_3_fail }) 

# starting time of the loop 
start = Time.now 

# use a while loop to wait at most 10 seconds or until 
# the results are empty (no sub-processes) 
while ((results = Process.waitall).count > 0 && Time.now - start < 10) do 
    results.each do |pid, status| 
    if status != 0 
     # again add the { PID => lambda } to the hash 
     sub_processes.merge! ({ Process::fork { sub_processes[pid].call } => sub_processes[pid] }) 
    end 
    # delete the original entry 
    sub_processes.delete pid 
    end 
end 

ruby-docwaitall是有幫助的。