2011-06-03 150 views
4

我使用ruby-smpp和redis來實現基於隊列的後臺工作來發送SMPP消息。我是否正確使用eventmachine?

而且我想知道我是否以正確的方式使用eventmachine。它的工作原理,但它感覺不對。

#!/usr/bin/env ruby 

# Sample SMS gateway that can receive MOs (mobile originated messages) and 
# DRs (delivery reports), and send MTs (mobile terminated messages). 
# MTs are, in the name of simplicity, entered on the command line in the format 
# <sender> <receiver> <message body> 
# MOs and DRs will be dumped to standard out. 

require 'smpp' 
require 'redis/connection/hiredis' 
require 'redis' 
require 'yajl' 
require 'time' 

LOGFILE = File.dirname(__FILE__) + "/sms_gateway.log" 
PIDFILE = File.dirname(__FILE__) + '/worker_test.pid' 
Smpp::Base.logger = Logger.new(LOGFILE) 
#Smpp::Base.logger.level = Logger::WARN 

REDIS = Redis.new 

class MbloxGateway 

    # MT id counter. 
    @@mt_id = 0 

    # expose SMPP transceiver's send_mt method 
    def self.send_mt(sender, receiver, body) 

    if sender =~ /[a-z]+/i 
     source_addr_ton = 5 
    else 
     source_addr_ton = 2 
    end 

    @@mt_id += 1 
    @@tx.send_mt(('smpp' + @@mt_id.to_s), sender, receiver, body, { 
     :source_addr_ton => source_addr_ton 
    # :service_type => 1, 
    # :source_addr_ton => 5, 
    # :source_addr_npi => 0 , 
    # :dest_addr_ton => 2, 
    # :dest_addr_npi => 1, 
    # :esm_class => 3 , 
    # :protocol_id => 0, 
    # :priority_flag => 0, 
    # :schedule_delivery_time => nil, 
    # :validity_period => nil, 
    # :registered_delivery=> 1, 
    # :replace_if_present_flag => 0, 
    # :data_coding => 0, 
    # :sm_default_msg_id => 0 
    #  
    }) 
    end 

    def logger 
    Smpp::Base.logger 
    end 

    def start(config) 
    # Write this workers pid to a file 
    File.open(PIDFILE, 'w') { |f| f << Process.pid } 
    # The transceiver sends MT messages to the SMSC. It needs a storage with Hash-like 
    # semantics to map SMSC message IDs to your own message IDs. 
    pdr_storage = {} 

    # Run EventMachine in loop so we can reconnect when the SMSC drops our connection. 
    loop do 
     EventMachine::run do    
     @@tx = EventMachine::connect(
      config[:host], 
      config[:port], 
      Smpp::Transceiver, 
      config, 
      self # delegate that will receive callbacks on MOs and DRs and other events 
     ) 

     # Let the connection start before we check for messages 
     EM.add_timer(3) do 
     # Maybe there is some better way to do this. IDK, But it works! 
     EM.defer do 
      loop do 
      # Pop a message 
      message = REDIS.lpop 'messages:send:queue' 
      if message # If there is a message. Process it and check the queue again 
       message = Yajl::Parser.parse(message, :check_utf8 => false) # Parse the message from Json to Ruby hash 
       if !message['send_after'] or (message['send_after'] and Time.parse(message['send_after']) < Time.now) 

       self.class.send_mt(message['sender'], message['receiver'], message['body']) # Send the message 
       REDIS.publish 'log:messages', "#{message['sender']} -> #{message['receiver']}: #{message['body']}" # Push the message to the redis queue so we can listen to the channel 
       else 
       REDIS.lpush 'messages:queue', Yajl::Encoder.encode(message) 
       end 
      else # If there is no message. Sleep for a second 
       sleep 1 
      end 
      end 
     end 
     end 
    end 
     sleep 2 
    end 
    end 

    # ruby-smpp delegate methods 

    def mo_received(transceiver, pdu) 
    logger.info "Delegate: mo_received: from #{pdu.source_addr} to #{pdu.destination_addr}: #{pdu.short_message}" 
    end 

    def delivery_report_received(transceiver, pdu) 
    logger.info "Delegate: delivery_report_received: ref #{pdu.msg_reference} stat #{pdu.stat}" 
    end 

    def message_accepted(transceiver, mt_message_id, pdu) 
    logger.info "Delegate: message_accepted: id #{mt_message_id} smsc ref id: #{pdu.message_id}" 
    end 

    def message_rejected(transceiver, mt_message_id, pdu) 
    logger.info "Delegate: message_rejected: id #{mt_message_id} smsc ref id: #{pdu.message_id}" 
    end 

    def bound(transceiver) 
    logger.info "Delegate: transceiver bound" 
    end 

    def unbound(transceiver) 
    logger.info "Delegate: transceiver unbound" 
    EventMachine::stop_event_loop 
    end 

end 

# Start the Gateway 
begin 
    puts "Starting SMS Gateway. Please check the log at #{LOGFILE}" 

    # SMPP properties. These parameters work well with the Logica SMPP simulator. 
    # Consult the SMPP spec or your mobile operator for the correct settings of 
    # the other properties. 
    config = { 
    :host => 'server.com', 
    :port => 3217, 
    :system_id => 'user', 
    :password => 'password', 
    :system_type => 'type', # default given according to SMPP 3.4 Spec 
    :interface_version => 52, 
    :source_ton => 0, 
    :source_npi => 1, 
    :destination_ton => 1, 
    :destination_npi => 1, 
    :source_address_range => '', 
    :destination_address_range => '', 
    :enquire_link_delay_secs => 10 
    } 
    gw = MbloxGateway.new 
    gw.start(config) 

rescue Exception => ex 
    puts "Exception in SMS Gateway: #{ex} at #{ex.backtrace.join("\n")}" 
end 

回答

13

一些簡單的步驟,以使代碼更EventMachine的肥胖型:

  • 擺脫阻塞Redis的驅動程序,使用
  • 停止使用推遲EM-hiredis。使用Redis驅動程序推送到線程會使事情變得更糟,因爲它依賴於它使用的套接字周圍的鎖。
  • 擺脫add_timer(3)
  • 擺脫內部循環,通過使用EM.next_tick重新安排下一個事件循環的塊來取代它。外面的那個有點不必要。你也不應該繞着EM.run循環,通過調用@@ tx.reconnect,通過在你的未綁定方法中重新連接來正確處理斷開連接,而不是停止和重新啓動事件循環,會更清潔。
  • 不要睡覺,只是等待。 EventMachine會告訴你什麼時候新的東西進入網絡套接字。

這裏的周圍EventMachine的核心代碼是如何將看起來像一些改進:

def start(config) 
    File.open(PIDFILE, 'w') { |f| f << Process.pid } 
    pdr_storage = {} 

    EventMachine::run do 
    @@tx = EventMachine::connect(
     config[:host], 
     config[:port], 
     Smpp::Transceiver, 
     config, 
     self 
    ) 
    REDIS = EM::Hiredis.connect 

    pop_message = lambda do 
     REDIS.lpop 'messages:send:queue' do |message| 
     if message # If there is a message. Process it and check the queue again 
      message = Yajl::Parser.parse(message, :check_utf8 => false) # Parse the message from Json to Ruby hash 
      if !message['send_after'] or (message['send_after'] and Time.parse(message['send_after']) < Time.now) 
      self.class.send_mt(message['sender'], message['receiver'], message['body']) 
      REDIS.publish 'log:messages', "#{message['sender']} -> #{message['receiver']}: #{message['body']}" 
      else 
      REDIS.lpush 'messages:queue', Yajl::Encoder.encode(message) 
      end 
     end 
     EM.next_tick &pop_message 
     end 
    end 
    end 
end 

不是完美的,可以使用一些過於清理,但這更應該是什麼樣子以EventMachine方式。不睡覺,儘可能避免使用延遲,並且不要使用可能阻塞的網絡驅動程序,通過在下一個反應堆環路上重新安排時間來實現傳統環路。在Redis方面,差異並不大,但它更像EventMachine-y這種方式。

希望這會有所幫助。如果您仍然有疑問,很樂意進一步解釋。

+0

我認爲模塊::方法(冒號)的語法有點被棄用,而最近被人們所青睞。你有沒有理由在這裏使用冒號? – 2011-06-05 14:53:23

+1

這是來自我認爲我複製的原始代碼的剩餘部分。我更喜歡點語法,但我仍然看到兩者都用於代碼。 – roidrage 2011-06-05 14:56:54

+0

非常感謝!這很棒!但你會介意檢查我的最終代碼在這裏工作很好:https://gist.github.com/1009106。 但我有一些想法: 1.我覺得沒有必要在每個tick上查詢redis。或者它可能不會打擾redis這麼多? 2.我的新代碼是否正確。現在就足夠了嗎? ;) 3.我個人不喜歡lambda。我可以將它移到一個方法上嗎? 4.我正在考慮使用Kernel.fork在子進程中創建2個工作子進程,並讓母進程監視孩子。你認爲它會起作用嗎?它有什麼用處? – Lisinge 2011-06-05 16:12:13

2

您正在阻止EM的反應器循環中的Redis調用。它有效,但不是要走的路。您可以查看em-hiredis以正確地將Redis呼叫與EM集成。

+0

是的,謝謝。我是新的eventmachine。如何在不使用醜陋循環的情況下實現redis拉片?你能舉一個例子來說明如何解決這個問題嗎? – Lisinge 2011-06-03 22:30:38