2014-09-06 11 views
7

sto facendo un gioco HTML5 www.titansoftime.comUso Pthread PHP con Ratchet websocket

Sto usando cricchetto come una soluzione server websocket PHP. Funziona alla grande! http://socketo.me/docs/push

Ho eseguito diversi test standalone utilizzando l'estensione php pthreads e ho visto alcuni risultati molto interessanti. Funziona davvero e funziona bene .. finché i web socket non sono nel mix.

I pthread offrono funzionalità di multithreading PHP (funzionano davvero ed è incredibile). http://php.net/manual/en/book.pthreads.php

Questo è quello che faccio:

/src/server.php Questo è il file che avvia il demone.

<?php 
    session_start(); 

    use Ratchet\Server\IoServer; 
    use Ratchet\WebSocket\WsServer; 
    use MyApp\Pusher; 

    require __DIR__ . '/../vendor/autoload.php'; 

    require_once __DIR__ . '/../mysql.cls.php'; 
    require_once __DIR__ . '/../game.cls.php'; 
    require_once __DIR__ . '/../model.cls.php'; 

    $mysql = new mysql; 
    $game = new game; 

    $loop = React\EventLoop\Factory::create(); 
    $pusher = new MyApp\Pusher(); 

    $loop->addPeriodicTimer(0.50, function() use($pusher){ 
     $pusher->load(); 
    }); 

    $webSock = new React\Socket\Server($loop); 

    if ($loop instanceof \React\EventLoop\LibEventLoop) { 
     echo "\n HAS LibEvent"; 
    } 

    $webSock->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect 
    $webServer = new Ratchet\Server\IoServer(
      new Ratchet\Http\HttpServer(
        new Ratchet\WebSocket\WsServer($pusher) 
      ), 
      $webSock 
    ); 

    $loop->run(); 

Tutto funziona correttamente.

/src/MyApp/Pusher.php Questa classe invia dati a tutti gli utenti connessi.

<?php 
namespace MyApp; 
use Ratchet\ConnectionInterface; 
use Ratchet\MessageComponentInterface; 

class AsyncThread extends \Thread{ 

    public $client; 

    public function __construct($client){ 
     $this->client = $client; 
    } 

    public function run(){ 

     // do work on $this->client 
     $user = mysql::assoc('SELECT * from users WHERE connection_id = "'.$this->client->resourceId.'"'); 
     // etc.. 
     $this->client->send(json_encode(array('foo'=>'bar'))); 

    } 

} 

class Pusher implements MessageComponentInterface{ 

    public static $clients = array(); 

    #load 
    public static function load(){ 

     $client_count = count(self::$clients); 

     echo "\n\n\n".'Serving to '.$client_count.' clients. '.time(); 

     $start = $istart = microtime(true); 

     if(!count(self::$clients)){ 
      if(!mysql_ping()){ 
       $game->connect(); 
      } 
     } 

     $threads = array(); 
     foreach(self::$clients as $key => $client){  

      // HANDLE CLIENT 

      // This works just fine, the only problem is that if I have lets say 50 simultaneous users, the people near the end of the clients array will have to wait till the other users have been processed. This is not desirable 
      $client->send(json_encode('foo'=>'bar')); 

      // So I tried this: 
      $threads[$key] = new AsyncThread($client); 
      $threads[$key]->start(); 

      // At this point the AsyncThread class will throw a fatal error complaining about not being able to serialize a closure. 
      // If I dont set "$this->data = $client;" in the thread constructor no error appears but now I cant use the data. 

      // Also regardless of whether or not I bind the data in the AsyncThread constructor, 
      // the connection disappears if I call "new AsyncThread($client)". I cannot explain this behavior. 

     } 

    } 

    public function onMessage(ConnectionInterface $from, $msg) { 
     global $game; 
     if($msg){ 
      $data = json_decode($msg); 
      if($data){  

       switch($data->task){ 

        #connect 
        case 'connect': 
         echo "\n".'New connection! ('.$from->resourceId.') '.$from->remoteAddress; 
         self::$clients[] = $from; 
         break; 

        default: 
         self::closeConnection($from); 
         echo "\nNO TASK CLOSING"; 
         break; 

       } 
      }else{ 
       echo "\n NO DATA"; 
       self::closeConnection($from); 
      } 
     }else{ 
      echo "\n NO MSG"; 
      self::closeConnection($from); 
     } 
    } 

    public function closeConnection($conn){ 
     global $game; 
     if($conn){ 
      if($conn->resourceId){ 
       $connid = $conn->resourceId; 
       $conn->close(); 
       $new = array(); 
       foreach(self::$clients as $client){ 
        if($client->resourceId != $connid){ 
         $new[] = $client; 
        } 
       } 
       self::$clients = $new; 
       $game->query('UPDATE users set connection_id = 0 WHERE connection_id = "'.intval($connid).'" LIMIT 1'); 
       echo "\n".'Connection '.$connid.' has disconnected'; 
      } 
     } 
    } 

    public function onClose(ConnectionInterface $conn) { 
     echo "\nCLIENT DROPPED"; 
     self::closeConnection($conn); 
    } 

    public function onOpen(ConnectionInterface $conn) { 
    } 
    public function onError(ConnectionInterface $conn, \Exception $e) { 
     echo "\nCLIENT ERRORED"; 
     self::closeConnection($conn); 
    } 
    public function onSubscribe(ConnectionInterface $conn, $topic) { 
    } 
    public function onUnSubscribe(ConnectionInterface $conn, $topic) { 
    } 
    public function onCall(ConnectionInterface $conn, $id, $topic, array $params) { 
    } 
    public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) { 
    } 

} 

Tutto questo funziona bene finché non creo un thread all'interno del ciclo degli eventi.

Sto andando in questo modo nel modo sbagliato o php multithreading e websockets incompatibili?

+0

Hobbes avete aggiornamenti per questa domanda? –

+0

Sto ancora aspettando una risposta lol. – Hobbes

+1

Non penso sia necessario implementare il multithreading, se si è passati attraverso il cricchetto e si è reagito l'origine, allora capirai che sta usando funzioni di lettura socket senza blocco. Inoltre, se si desidera una piccola quantità di incremento delle prestazioni, è possibile che si desideri esaminare in modo libevente. –

risposta

1

controllo questo pacchetto https://github.com/huyanping/react-multi-process

Installare

compositore richiedere Jenner/reagire-multi-processo Come si usa?

Così semplice come:

$loop = React\EventLoop\Factory::create(); 
$server = stream_socket_server('tcp://127.0.0.1:4020'); 
stream_set_blocking($server, 0); 
$loop->addReadStream($server, function ($server) use ($loop) { 
    $conn = stream_socket_accept($server); 
    $data = "pid:" . getmypid() . PHP_EOL; 
    $loop->addWriteStream($conn, function ($conn) use (&$data, $loop) { 
     $written = fwrite($conn, $data); 
     if ($written === strlen($data)) { 
      fclose($conn); 
      $loop->removeStream($conn); 
     } else { 
      $data = substr($data, 0, $written); 
     } 
    }); 
}); 

// the second param is the sub process count 
$master = new \React\Multi\Master($loop, 20); 
$master->start(); 

Un esempio utilizzando Jenner/simple_fork come:

class IoServer { 
    /** 
    * @param int $count worker process count 
    * Run the application by entering the event loop 
    * @throws \RuntimeException If a loop was not previously specified 
    */ 
    public function run($count = 1) { 
     if (null === $this->loop) { 
      throw new \RuntimeException("A React Loop was not provided during instantiation"); 
     } 

     if($count <= 1){ 
      $this->loop->run(); 
     }else{ 
      $loop = $this->loop; 
      $master = new \Jenner\SimpleFork\FixedPool(function() use($loop) { 
       $this->loop->run(); 
      }, $count); 
      $master->start(); 
      $master->keep(true); 
//   or just 
//   $master = new \React\Multi\Master($this->loop, $count); 
//   $master->start(); 
     } 
    } 
}