2012-11-12 51 views
0

我需要將進程中的作業從一個隊列中處理出來,同時IO執行異步。這非常簡單。該問題是這些作業可以將其他項添加到隊列在Ruby中構建一個異步隊列

我想我一直在擺弄這個問題時間太長,所以我的大腦多雲 - 它不應該太難。我總是想出一個任意或場景:

  1. 隊列可以異步執行作業,並且結果可以在之後加入。
  2. 隊列可以同步執行作業,直到最後一次完成並且隊列爲空。

我已經一切從EventMachine的和歌利亞(兩者都可以使用EM::HttpRequest)來擺弄賽璐珞(實際上從未得到周圍雖然建立一些與它),寫的用纖維枚舉。我的大腦雖然油炸。

我想什麼,簡單地說,就是要能夠做到這一點:

items = [1,2,3] 
items.each do |item| 
    if item.has_particular_condition? 
    items << item.process_one_way 
    elsif item.other_condition? 
    items << item.process_another_way 
    # ... 
    end 
end 

#=> [1,2,3,4,5,6] 

...其中4,5,6人處理的一整套原始項目的所有成果, 7,8和9是來自處理4,5和6的結果。我不需要擔心無限期地處理隊列,因爲我正在處理的數據將在幾次迭代之後結束。

高層指導,評論,鏈接到其他庫等都是受歡迎的,以及較低級別的實現代碼示例。

+0

是的,異步系統可能很難建立。你有我的道義上的支持:) –

+1

我的第一個建議是:睡在它上面。當你開始將頭撞向牆壁時,事情似乎變得複雜......從經驗中,我知道現在是時候稱呼它。第二天,問題通常很容易解決。我喜歡在睡夢中解決問題 - 它不需要「努力」,第二天你會覺得自己像個天才。 – Casper

+0

@Casper我想我沒有指定「太長」需要在這裏和那裏工作幾個星期。 :/ – coreyward

回答

0

我最終實現了一些不太理想的東西 - 基本上只是將EM光纖迭代器封裝在循環中,一旦沒有新的結果排隊就終止循環。

require 'set' 

class SetRunner 
    def initialize(seed_queue) 
    @results = seed_queue.to_set 
    end 

    def run 
    begin 
     yield last_loop_results, result_bucket 
    end until new_loop_results.empty? 

    return @results 
    end 

    def last_loop_results 
    result_bucket.shift(result_bucket.count) 
    end 

    def result_bucket 
    @result_bucket ||= @results.to_a 
    end 

    def new_loop_results 
    # .add? returns nil if already in the set 
    result_bucket.each { |item| @results.add? item }.compact 
    end 
end 

然後,用EventMachine的使用它:

queue = [1,2,3] 
results = SetRunner.new(queue).run do |set, output| 
    EM::Synchrony::FiberIterator.new(set, 3).each do |item| 
    output.push(item + 3) if item <= 6 
    end 
end 
# => [1,2,3,4,5,6,7,8,9] 

然後每組將獲得與傳遞給FiberIterator併發級別運行,但每一組的結果將在下次迭代得到運行的外部SetRunner循環。

0

我過去有類似的要求,你需要的是一個堅實的,高性能的工作隊列。我建議你查看我在一年前發現的beanstalkd,並且一直用它來可靠地處理成千上萬的紅寶石作業。

特別是,我已經開始圍繞beanstalkd開發固體ruby庫。特別是,請務必使用beanstalkd檢查出在Ruby中是否爲生產就緒隊列的backburner。語法和設置非常簡單,定義了作業過程如何快速,處理作業失敗和重試都是內置的,作業調度和更多。

讓我知道如果您有任何問題,但我認爲豆杆和backburner將適合您的要求相當好。

+0

這可能會起作用,但我在HTTP請求生命週期中執行工作,並且我需要解析並在所有作業完成時將結果返回給客戶端。有關如何使用Backburner進行管理的任何提示? – coreyward