2013-10-04 33 views
4

我想知道是否可以停止已經執行的操作。紅寶石事件機器停止或終止操作

require 'rubygems' 
require 'em-websocket' 

EM.run do 
    EM::WebSocket.start(:host => '0.0.0.0', :port => 8080) do |ws| 
    ws.onmessage do |msg| 
     op = proc do 
     sleep 5 # Thread safe IO here that is safely killed 
     true 
     end 

     callback = proc do |result| 
     puts "Done!" 
     end 

     EM.defer(op, callback) 
    end 
    end 
end 

這是一個示例web套接字服務器。有時候,當我收到一條消息時,我想要做一些IO,稍後在另一條消息可能需要讀取同樣的東西時,接下來的東西總是優先於先前的東西。所以我想取消第一個操作並做第二個操作。

+0

實際上,你不是在任何給定的點使用一個線程(除了反應器線程)嗎? – Kashyap

+2

我_think_我得到了你正在尋找的答案(它工作,順便說一句)。但我不知道它是否有任何不必要的副作用:/這是我的解決方案:https://gist.github.com/kgrz/6826255 – Kashyap

+0

問題不完全清楚,爲什麼我想要這樣做。是的,在這個例子中,我只會在任何給定的點上使用一個線程。我的真實世界的例子是我有兩種類型的消息進來。類型1執行長時間運行的IO,類型2執行各種各樣的事情。當類型1消息進來時,它推遲並且需要很長時間,意味着當多個類型2消息進入時,執行它們的操作和回調。我基本上只想要一個類型1操作始終進行,不會干擾任何類型2操作。 –

回答

1

這是我的解決方案。它與EM.queue解決方案類似,只是使用散列。

require 'rubygems' 
require 'em-websocket' 
require 'json' 

EM.run do 
    EM::WebSocket.start(:host => '0.0.0.0', :port => 3333) do |ws| 
    mutex = Mutex.new # to make thread safe. See https://github.com/eventmachine/eventmachine/blob/master/lib/eventmachine.rb#L981 
    queue = EM::Queue.new 
    ws.onmessage do |msg| 
     message_type = JSON.parse(msg)["type"] 
     op = proc do 
     mutex.synchronize do 
      if message_type == "preferred" 
      puts "killing non preferred\n" 
      queue.size.times { queue.pop {|thread| thread.kill } } 
      end 
      queue << Thread.current 
     end 

     puts "doing the long running process" 
     sleep 15 # Thread safe IO here that is safely killed 
     true 
     end 

     callback = proc do |result| 
     puts "Finished #{message_type} #{msg}" 
     end 

     EM.defer(op, callback) 
    end 
    end 
end 
+0

你不需要爲此添加互斥體嗎? – Kashyap

+0

我嘗試了很多次,每次都有效。這裏的文檔https://github.com/eventmachine/eventmachine/blob/master/lib/eventmachine.rb#L981似乎同意你的看法,所以我在你的回答中提出了一些建議。應該已經發布了,而不是github,你會得到信用! – earlonrails