2014-07-06 141 views
0

我在服務器上安裝了Mosquitto,它運行正常,並且正常工作。 我已經安裝了Paho mqtt客戶端插件來檢查。 和我寫了Android的代碼,它在模擬器上工作正常。phpMQTT發佈不起作用

現在在我自己的電腦一切工作正常,我運行的PHP代碼和我的應用程序在Android接收郵件和Paho客戶端也收到該消息。

但是,當我上傳服務器上的文件並運行它時,沒有任何反應,雖然php代碼成功運行一切,並且發佈消息時沒有任何錯誤。 甚至蚊子的發佈者可執行文件工作正常,客戶端收到該消息。

只有這個服務器上的php代碼沒有做任何事情。

$mqtt = new phpMQTT("localhost", 1883, "server"); 

     if ($mqtt->connect()) { 
      echo"connected\n"; 
      for($i=0;$i<4;++$i){ 

       if($mqtt->publish("new","Hey . ".$i,2)){ 
        echo "published\n"; 
       } 
       else{ 
        echo "not published\n"; 
       } 
      } 
      $mqtt->close(); 
     } 
     else{ 
      echo"couldn't connect\n"; 
     } 

Bluerhinos phpMQTT

<?php 
/* phpMQTT */ 
class phpMQTT { 

    private $socket;   /* holds the socket */ 
    private $msgid = 1;   /* counter for message id */ 
    public $keepalive = 10;  /* default keepalive timmer */ 
    public $timesinceping;  /* host unix time, used to detect disconects */ 
    public $topics = array(); /* used to store currently subscribed topics */ 
    public $debug = true;  /* should output debug messages */ 
    public $address;   /* broker address */ 
    public $port;    /* broker port */ 
    public $clientid;   /* client id sent to brocker */ 
    public $will;    /* stores the will of the client */ 
    private $username;   /* stores username */ 
    private $password;   /* stores password */ 

    function __construct($address, $port, $clientid){ 
     $this->broker($address, $port, $clientid); 
    } 

    /* sets the broker details */ 
    function broker($address, $port, $clientid){ 
     $this->address = $address; 
     $this->port = $port; 
     $this->clientid = $clientid;   
    } 

    /* connects to the broker 
     inputs: $clean: should the client send a clean session flag */ 
    function connect($clean = true, $will = NULL, $username = NULL, $password = NULL){ 

     if($will) $this->will = $will; 
     if($username) $this->username = $username; 
     if($password) $this->password = $password; 

     $address = $this->address; 
     $this->socket = fsockopen($address, $this->port, $errno, $errstr, 60); 

     if (!$this->socket) { 
      error_log("fsockopen() $errno, $errstr \n"); 
      return false; 
     } 

     stream_set_timeout($this->socket, 5); 
     stream_set_blocking($this->socket, 0); 

     $i = 0; 
     $buffer = ""; 

     $buffer .= chr(0x00); $i++; 
     $buffer .= chr(0x06); $i++; 
     $buffer .= chr(0x4d); $i++; 
     $buffer .= chr(0x51); $i++; 
     $buffer .= chr(0x49); $i++; 
     $buffer .= chr(0x73); $i++; 
     $buffer .= chr(0x64); $i++; 
     $buffer .= chr(0x70); $i++; 
     $buffer .= chr(0x03); $i++; 

     //No Will 
     $var = 0; 
     if($clean) $var+=2; 

     //Add will info to header 
     if($this->will != NULL){ 
      $var += 4; // Set will flag 
      $var += ($this->will['qos'] << 3); //Set will qos 
      if($this->will['retain']) $var += 32; //Set will retain 
     } 

     if($this->username != NULL) $var += 128; //Add username to header 
     if($this->password != NULL) $var += 64; //Add password to header 

     $buffer .= chr($var); $i++; 

     //Keep alive 
     $buffer .= chr($this->keepalive >> 8); $i++; 
     $buffer .= chr($this->keepalive & 0xff); $i++; 

     $buffer .= $this->strwritestring($this->clientid,$i); 

     //Adding will to payload 
     if($this->will != NULL){ 
      $buffer .= $this->strwritestring($this->will['topic'],$i); 
      $buffer .= $this->strwritestring($this->will['content'],$i); 
     } 

     if($this->username) $buffer .= $this->strwritestring($this->username,$i); 
     if($this->password) $buffer .= $this->strwritestring($this->password,$i); 

     $head = " "; 
     $head{0} = chr(0x10); 
     $head{1} = chr($i); 

     fwrite($this->socket, $head, 2); 
     fwrite($this->socket, $buffer); 

     $string = $this->read(4); 

     if(ord($string{0})>>4 == 2 && $string{3} == chr(0)){ 
      if($this->debug) echo "Connected to Broker\n"; 
     }else{ 
      error_log(sprintf("Connection failed! (Error: 0x%02x 0x%02x)\n", 
            ord($string{0}),ord($string{3}))); 
      return false; 
     } 

     $this->timesinceping = time(); 

     return true; 
    } 

    /* read: reads in so many bytes */ 
    function read($int = 8192, $nb = false){ 

     // print_r(socket_get_status($this->socket)); 

     $string=""; 
     $togo = $int; 

     if($nb){ 
      return fread($this->socket, $togo); 
     } 

     while (!feof($this->socket) && $togo>0) { 
      $fread = fread($this->socket, $togo); 
      $string .= $fread; 
      $togo = $int - strlen($string); 
     } 




      return $string; 
    } 

    /* subscribe: subscribes to topics */ 
    function subscribe($topics, $qos = 0){ 
     $i = 0; 
     $buffer = ""; 
     $id = $this->msgid; 
     $buffer .= chr($id >> 8); $i++; 
     $buffer .= chr($id % 256); $i++; 

     foreach($topics as $key => $topic){ 
      $buffer .= $this->strwritestring($key,$i); 
      $buffer .= chr($topic["qos"]); $i++; 
      $this->topics[$key] = $topic; 
     } 

     $cmd = 0x80; 
     //$qos 
     $cmd += ($qos << 1); 


     $head = chr($cmd); 
     $head .= chr($i); 

     fwrite($this->socket, $head, 2); 
     fwrite($this->socket, $buffer, $i); 
     $string = $this->read(2); 

     $bytes = ord(substr($string,1,1)); 
     $string = $this->read($bytes); 
    } 

    /* ping: sends a keep alive ping */ 
    function ping(){ 
      $head = " "; 
      $head = chr(0xc0);  
      $head .= chr(0x00); 
      fwrite($this->socket, $head, 2); 
      if($this->debug) echo "ping sent\n"; 
    } 

    /* disconnect: sends a proper disconect cmd */ 
    function disconnect(){ 
      $head = " "; 
      $head{0} = chr(0xe0);  
      $head{1} = chr(0x00); 
      fwrite($this->socket, $head, 2); 
    } 

    /* close: sends a proper disconect, then closes the socket */ 
    function close(){ 
     $this->disconnect(); 
     fclose($this->socket); 
    } 

    /* publish: publishes $content on a $topic */ 
    function publish($topic, $content, $qos = 0, $retain = 0){ 

     $i = 0; 
     $buffer = ""; 

     $buffer .= $this->strwritestring($topic,$i); 

     //$buffer .= $this->strwritestring($content,$i); 

     if($qos){ 
      $id = $this->msgid++; 
      $buffer .= chr($id >> 8); $i++; 
      $buffer .= chr($id % 256); $i++; 
     } 

     $buffer .= $content; 
     $i+=strlen($content); 


     $head = " "; 
     $cmd = 0x30; 
     if($qos) $cmd += $qos << 1; 
     if($retain) $cmd += 1; 

     $head{0} = chr($cmd);  
     $head .= $this->setmsglength($i); 

     if((fwrite($this->socket, $head, strlen($head)))&&(fwrite($this->socket, $buffer, $i))){ 
      echo "done: ".$this->socket."<br>"; 
      return true; 
     } 
     else { 
      echo $head; 
      return false; 
     } 

    } 

    /* message: processes a recieved topic */ 
    function message($msg){ 
      $tlen = (ord($msg{0})<<8) + ord($msg{1}); 
      $topic = substr($msg,2,$tlen); 
      $msg = substr($msg,($tlen+2)); 
      $found = 0; 
      foreach($this->topics as $key=>$top){ 
       if(preg_match("/^".str_replace("#",".*", 
         str_replace("+","[^\/]*", 
          str_replace("/","\/", 
           str_replace("$",'\$', 
            $key))))."$/",$topic)){ 
        if(function_exists($top['function'])){ 
         call_user_func($top['function'],$topic,$msg); 
         $found = 1; 
        } 
       } 
      } 

      if($this->debug && !$found) echo "msg recieved but no match in subscriptions\n"; 
    } 

    /* proc: the processing loop for an "allways on" client 
     set true when you are doing other stuff in the loop good for watching something else at the same time */  
    function proc($loop = true){ 

     if(1){ 
      $sockets = array($this->socket); 
      $w = $e = NULL; 
      $cmd = 0; 

       //$byte = fgetc($this->socket); 
      if(feof($this->socket)){ 
       if($this->debug) echo "eof receive going to reconnect for good measure\n"; 
       fclose($this->socket); 
       $this->connect(false); 
       if(count($this->topics)) 
        $this->subscribe($this->topics);  
      } 

      $byte = $this->read(1, true); 

      if(!strlen($byte)){ 
       if($loop){ 
        usleep(100000); 
       } 

      }else{ 

       $cmd = (int)(ord($byte)/16); 
       if($this->debug) echo "Recevid: $cmd\n"; 

       $multiplier = 1; 
       $value = 0; 
       do{ 
        $digit = ord($this->read(1)); 
        $value += ($digit & 127) * $multiplier; 
        $multiplier *= 128; 
        }while (($digit & 128) != 0); 

       if($this->debug) echo "Fetching: $value\n"; 

       if($value) 
        $string = $this->read($value,"fetch"); 

       if($cmd){ 
        switch($cmd){ 
         case 3: 
          $this->message($string); 
         break; 
        } 

        $this->timesinceping = time(); 
       } 
      } 

      if($this->timesinceping < (time() - $this->keepalive)){ 
       if($this->debug) echo "not found something so ping\n"; 
       $this->ping(); 
      } 


      if($this->timesinceping<(time()-($this->keepalive*2))){ 
       if($this->debug) echo "not seen a package in a while, disconnecting\n"; 
       fclose($this->socket); 
       $this->connect(false); 
       if(count($this->topics)) 
        $this->subscribe($this->topics); 
      } 

     } 
     return 1; 
    } 

    /* getmsglength: */ 
    function getmsglength(&$msg, &$i){ 

     $multiplier = 1; 
     $value = 0 ; 
     do{ 
      $digit = ord($msg{$i}); 
      $value += ($digit & 127) * $multiplier; 
      $multiplier *= 128; 
      $i++; 
     }while (($digit & 128) != 0); 

     return $value; 
    } 


    /* setmsglength: */ 
    function setmsglength($len){ 
     $string = ""; 
     do{ 
      $digit = $len % 128; 
      $len = $len >> 7; 
      // if there are more digits to encode, set the top bit of this digit 
      if ($len > 0) 
      $digit = ($digit | 0x80); 
      $string .= chr($digit); 
     }while ($len > 0); 
     return $string; 
    } 

    /* strwritestring: writes a string to a buffer */ 
    function strwritestring($str, &$i){ 
     $ret = " "; 
     $len = strlen($str); 
     $msb = $len >> 8; 
     $lsb = $len % 256; 
     $ret = chr($msb); 
     $ret .= chr($lsb); 
     $ret .= $str; 
     $i += ($len+2); 
     return $ret; 
    } 

    function printstr($string){ 
     $strlen = strlen($string); 
      for($j=0;$j<$strlen;$j++){ 
       $num = ord($string{$j}); 
       if($num > 31) 
        $chr = $string{$j}; else $chr = " "; 
       printf("%4d: %08b : 0x%02x : %s \n",$j,$num,$num,$chr); 
      } 
    } 
} 

?> 

回答

0

關注這個庫發佈例子,看看會發生什麼這兩個日誌:本地主機代理和PHP的日誌

https://github.com/bluerhinos/phpMQTT/blob/master/examples/publish.php

+0

感謝您的答覆,我發現MQTT僅限於單個端口上的1024個連接,所以我轉向了AMQP,這樣會好很多。 –

+0

這可能是Mosquitto默認代理設置的限制,但它本身不是MQTT協議限制: https://twitter.com/saradhix/status/28025989189 此外,您可以嘗試其他可擴展代理,如:ActiveMQ ,RabbitMQ或HiveMQ。 – Teixi

+0

是的,我正在使用RabbitMQ,這是最好的。 –