2012-04-20 66 views
1

我目前正在建設一個軌道的過程,並且需要多個工作進程的architechture被告知某些事件(比如創建一個對象)的。是否有一個Ruby消息總線的寶石?

Rails 
    |   API Worker 
    +----------o------o--------o------ - - - 
         Some other 
          daemon 

我想做到以下幾點

class Article 
    after_creation do 
    MessageBus.send type: "article-created", id: self.id 
    end 
end 

雖然過程(API,工人,守護進程,...)只訂閱消息總線和塊被調用時消息進來。

MessageBus.subscribe do |msg| 
    if msg['type'] == 'article-created' 
    # if this is my websocket daemon, I would push the article to the browser 
    # if this is my indexing daemon, I would reindex the full-text search 
    # if this is ... you get the deal. 
    end 
end 

目前我使用,我推JSON與UNIXSocketEventMachine.start_unix_domain_server得到它的一個本地的Unix域套接字。但是,這隻允許雙向溝通。我也想過使用resque,但是當我需要一輛公共汽車時,這更多是一個消息隊列。這取決於redis。我很確定必須有一個寶石,它實現了紅寶石的一些消息總線,但谷歌搜索沒有導致任何結果

+2

你嘗試EventMachine的渠道? http://eventmachine.rubyforge.org/EventMachine/Channel.html – tommasop 2012-04-20 14:56:12

+0

1爲EventMachine的通道。需要考慮的另一件事(如果你的工作人員和主人在同一個進程空間中運行)是Ruby對觀察者模式a [Observable](http://www.ruby-doc.org/stdlib-1.9) .3/libdoc/observer/rdoc/Observable.html) – 2012-04-20 15:31:02

+2

請參閱:https://github.com/SamSaffron/message_bus – 2013-05-22 11:04:13

回答

1

最後,我使用Eventmachine通道破解了一個快速的自己的解決方案。

這是我的服務器。基本上是一個客戶端連接到/tmp/messagebus.sock和發送數據。所有推入套接字的東西都會發送給所有其他客戶端。

require 'rubygems' 
require 'eventmachine' 

module Messagebus 
    class Server 
    attr_accessor :connections 
    attr_accessor :channel 

    def initialize 
     @connections = [] 
     @channel = EventMachine::Channel.new 
    end 

    def start 
     @signature = EventMachine.start_unix_domain_server '/tmp/messagebus.sock', Connection do |conn| 
     conn.server = self 
     end 
    end 

    def stop 
     EventMachine.stop_server(@signature) 

     unless wait_for_connections_and_stop 
     EventMachine.add_periodic_timer(1) { wait_for_connections_and_stop } 
     end 
    end 

    def wait_for_connections_and_stop 
     if @connections.empty? 
     EventMachine.stop 
     true 
     else 
     puts "Waiting for #{@connections.size} connection(s) to finish ..." 
     false 
     end 
    end 
    end 

    class Connection < EventMachine::Connection 
    attr_accessor :server 

    def post_init 
     log "Connection opened" 
    end 

    def server=(server) 
     @server = server 

     @subscription = server.channel.subscribe do |data| 
     self.log "Sending #{data}" 
     self.send_data data 
     end 
    end 

    def receive_data(data) 
     log "Received #{data}" 
     server.channel.push data 
    end 

    def unbind 
     server.channel.unsubscribe @subscription 
     server.connections.delete(self) 
     log "Connection closed" 
    end 

    def log(msg) 
     puts "[#{self.object_id}] #{msg}" 
    end 
    end 
end 

EventMachine::run { 
    s = Messagebus::Server.new 
    s.start 
    puts "New server listening" 
}