2015-02-09 70 views
2

我嘗試構建一個小實時websocket用例,用戶可以登錄並查看所有其他登錄用戶,在新用戶登錄或現有用戶註銷。React/Ratchet/ZMQ中的多種訂閱方法的最佳實踐

對於這種情況,當用戶登錄或註銷時,我在UserController中使用ZMQ PUSH套接字。

UserConstroller

public function login() { 

     //... here is the auth code, model call etc... 

     $aUserData = array();// user data comes from the database with username, logintime, etc.... 

     $context = new \ZMQContext(); 
     $oSocket = $context->getSocket(\ZMQ::SOCKET_PUSH, 'USER_LOGIN_PUSHER'); // use persistent_id 
     if($oSocket instanceof \ZMQSocket) { 

      $oSocket->connect("tcp://127.0.0.1:5555"); // 
      $oSocket->send(json_encode($aUserData)); 
     } 
    } 

    public function logout() { 
     //... here is the logout code, model call etc .... 

     $aUserData = array();// user data comes from the SESSION with username, logintime, etc.... 

     $context = new \ZMQContext(); 
     $oSocket = $context->getSocket(\ZMQ::SOCKET_PUSH, 'USER_LOGOUT_PUSHER'); // use persistent_id 
     if($oSocket instanceof \ZMQSocket) { 

      $oSocket->connect("tcp://127.0.0.1:5555"); // 
      $oSocket->send(json_encode($aUserData)); 
     } 
    } 

然後我有一個推類像棘輪文檔:link

在這個類有兩種方法:onUserLoginonUserLogout當然所有其他的東西,如

onSubscribe,的OnOpen,onPublish

UserInformationPusher

public function onUserLogin($aUserData) { 
     //var_dump("onUserLogin"); 
     $sUserData = json_decode($aUserData, true); 

     $oTopic = $this->subscribedTopics["user_login"]; 

     if($oTopic instanceof Topic) { 
      $oTopic->broadcast($sUserData); 
     } else { 
      return; 
     } 
    } 

    public function onUserLogout($aUserData) { 
     //var_dump("onUserLogout"); 
     $entryData = json_decode($aUserData, true); 

     $oTopic = $this->subscribedTopics["user_logout"]; 

     if($oTopic instanceof Topic) { 
      $oTopic->broadcast($entryData); 
     } else { 
      return; 
     } 
    } 

的最後一部分是WampServer/WsServer/HttpServer的與一個循環,收聽傳入連接。還有我的ZMQ PULL插座

RatchetServerConsole

public function start_server() { 

     $oPusher = new UserInformationPusher(); 

     $oLoop = \React\EventLoop\Factory::create(); 
     $oZMQContext = new \React\ZMQ\Context($oLoop); 
     $oPullSocket = $oZMQContext->getSocket(\ZMQ::SOCKET_PULL); 

     $oPullSocket->bind('tcp://127.0.0.1:5555'); // Binding to 127.0.0.1 means the only client that can connect is itself 
     $oPullSocket->on('message', array($oPusher, 'onUserLogin')); 
     $oPullSocket->on('message', array($oPusher, 'onUserLogout')); 


     $oMemcache = new \Memcache(); 
     $oMemcache->connect('127.0.0.1', 11211); 
     $oMemcacheHandler = new Handler\MemcacheSessionHandler($oMemcache); 

     $oSession = new SessionProvider(
      new \Ratchet\Wamp\WampServer(
       $oPusher 
      ), 
      $oMemcacheHandler 
     ); 

     //$this->Output->info("Server start initiation with memcache!..."); 
     $webSock = new \React\Socket\Server($oLoop); 
     $webSock->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect 
     $oServer = new \Ratchet\Server\IoServer(
      new \Ratchet\Http\HttpServer(
       new \Ratchet\WebSocket\WsServer(
        $oSession 
       ) 
      ), 
      $webSock 
     ); 
     $this->Output->info("Server started "); 
     $oLoop->run(); 

    } 

在這個例子中,調用從登錄()或退出()總是會調用兩種方法(onUserLogin和onUserLogout)。 我無法找到一些文檔,它描述了我可以在($ event,callable $ listener)方法上使用哪些事件,有沒有人有鏈接/知識庫? 什麼是最好的方法來檢查從UserController哪個方法被解僱?

  • 我可以在控制器添加一些信息到$ sUserData,並在推檢查這
  • 我可以綁定的其他套接字連接到不同的端口(例如5554爲推拉),並使用上( )方法
  • 我可以...是否有另一個最佳實踐來解決這個問題?

沒有必要因爲它的客戶端代碼工作正常

+0

你能否提一下訪問「new \ ZMQContext();」的步驟?因爲我無法這樣做。我已經問過這個問題在這裏http://stackoverflow.com/questions/41054517/cakephp-3-react-zmq-library-namespace – i01000001 2016-12-09 07:00:42

回答

2

的密集處理與PHP的最佳實踐一個月以後在WebSockets的,我從我的做法改爲Crossbar.iovoryx/Thruway在PHP後臺和Autobahn|JS在前端。 所有這些組件都支持WAMP V2 Websocket Standard,並且能夠處理我的要求。

如果有一些要求,我可以將解決方案發布到上面的問題中,使用上述組件。

+1

請張貼您的解決方案,不可能不使用WAMP V2? – 2015-10-05 20:42:10

2

在你RatchetServerConsole

刪除,

$oPullSocket->on('message', array($oPusher, 'onUserLogin')); 
$oPullSocket->on('message', array($oPusher, 'onUserLogout')); 

添加,

$oPullSocket->on('message', array($oPusher, 'onUserActionBroadcast')); 

在你UserInformationPusher

刪除onUserLogin()和onUserLogout()。

添加,

public function onUserActionBroadcast($aUserData) 
{ 
    $entryData = json_decode($aUserData, true); 

    // If the lookup topic object isn't set there is no one to publish to 
    if (!array_key_exists($entryData['topic'], $this->subscribedTopics)) { 
     return; 
    } 

    $topic = $this->subscribedTopics[$entryData['topic']]; 

    unset($entryData['topic']); 

    // re-send the data to all the clients subscribed to that category 
    $topic->broadcast($entryData); 
} 

UserConstroller(添加在$ aUserData的話題),

public function login() { 

    //... here is the auth code, model call etc... 

    $aUserData = array();// user data comes from the database with username, logintime, etc.... 

    $aUserData['topic'] = 'USER_LOGIN'; // add the topic name 

    $context = new \ZMQContext(); 
    $oSocket = $context->getSocket(\ZMQ::SOCKET_PUSH, 'my pusher'); // use persistent_id 
    if($oSocket instanceof \ZMQSocket) { 

     $oSocket->connect("tcp://127.0.0.1:5555"); // 
     $oSocket->send(json_encode($aUserData)); 
    } 
} 

public function logout() { 
    //... here is the logout code, model call etc .... 

    $aUserData = array();// user data comes from the SESSION with username, logintime, etc.... 

    $aUserData['topic'] = 'USER_LOGOUT'; // add the topic name 

    $context = new \ZMQContext(); 
    $oSocket = $context->getSocket(\ZMQ::SOCKET_PUSH, 'my pusher'); // use persistent_id 
    if($oSocket instanceof \ZMQSocket) { 

     $oSocket->connect("tcp://127.0.0.1:5555"); // 
     $oSocket->send(json_encode($aUserData)); 
    } 
} 

視圖文件

最後,

<script> 
var conn = new ab.Session('ws://yourdomain.dev:9000', // Add the correct domain and port here 
    function() { 
     conn.subscribe('USER_LOGIN', function(topic, data) {     
      console.log(topic); 
      console.log(data); 
     }); 

     conn.subscribe('USER_LOGOUT', function(topic, data) {     
      console.log(topic); 
      console.log(data); 
     }); 
    }, 
    function() { 
     console.warn('WebSocket connection closed'); 
    }, 
    {'skipSubprotocolCheck': true} 
); 
</script> 

注意:基本思想是在推動者類中使用單個廣播函數。