2015-12-01 22 views
5

Ho un processo di back-end costoso per il calcolo in Symfony2/PHP che mi piacerebbe eseguire multi-thread.esegue il ciclo grande con thread paralleli in CLI PHP

Poiché eseguo l'iterazione su migliaia di oggetti, penso che non dovrei iniziare un thread per oggetto. Mi piacerebbe avere una variabile $ core che definisce quanti thread voglio in parallelo, quindi scorrere il ciclo e mantenere attivi molti thread. Quindi, ogni volta che finisce un thread, deve essere avviato uno nuovo con l'oggetto successivo, fino a quando tutti gli oggetti sono stati completati.

Guardando la documentazione di pthreads e facendo alcune ricerche su Google, non riesco a trovare un esempio utilizzabile per questa situazione. Tutti gli esempi che ho trovato hanno un numero fisso di thread che eseguono una volta, nessuno dei quali esegue iterazioni su migliaia di oggetti.

Qualcuno può indicarmi la direzione giusta per iniziare? Comprendo le nozioni di base per impostare un thread e collegarlo, ecc. Ma non come farlo in un ciclo con una condizione di attesa.

+1

php non è veramente progettato per il multi-threading, i documenti pthreads hanno avvertimenti importanti su di esso. L'opzione più semplice sarebbe quella di trasferire tutte le attività in una coda, quindi avviare il numero x di processi (exec) per soddisfare la coda. Un'altra opzione sarebbe quella di dividere le attività in gruppi di (taskcount/x), e di nuovo avviare x processi, passando un gruppo ad ogni processo – Steve

+0

non è sicuro che avere un thread per una data quantità di oggetti sia una buona scelta, si avrà molto di thread se hai molti oggetti e il tuo programma diventerà più lento invece di diventare più veloce. IMHO è per questo che i thread sono corretti negli esempi che hai trovato – Freelancer

+1

Posso suggerirti questo? https://github.com/facile-it/paraunit Sono il mantainer, ho usato Symonfy Process; non è il tuo caso d'uso ma forse può darti un vantaggio ... O potresti dare un'occhiata a https://github.com/liuggio/fastest – Jean

risposta

3

La risposta alla domanda è utilizzare l'astrazione Pool e Worker.

L'idea di base è che si ::submitThreaded oggetti al Pool, che impila sul successivo disponibile Worker, distribuire i vostri Threaded oggetti (round robin) in tutti Workers.

Segue è super semplice codice è per PHP7 (pthreads v3):

<?php 
$jobs = []; 
while (count($jobs) < 2000) { 
    $jobs[] = mt_rand(0, 1999); 
} 

$pool = new Pool(8); 

foreach ($jobs as $job) { 
    $pool->submit(new class($job) extends Threaded { 
     public function __construct(int $job) { 
      $this->job = $job; 
     } 
     public function run() { 
      var_dump($this->job); 
     } 
    }); 
} 

$pool->shutdown(); 
?> 

I posti di lavoro sono inutili, ovviamente. Nel mondo reale, suppongo che il tuo array $jobs continui a crescere, quindi puoi semplicemente scambiare lo per alcuni do {} while e continuare a chiamare ::submit per nuovi lavori.

Nel mondo reale, si desidera raccogliere i rifiuti nello stesso ciclo (basta chiamare Pool::collect senza parametri per il comportamento predefinito).

Degno di nota, niente di tutto questo sarebbe possibile se fosse davvero il caso che PHP non era destinato a lavorare in ambienti multi-threaded ... è sicuramente è.

Questo è la risposta alla domanda, ma non lo rende il migliore soluzione al vostro problema.

Nei commenti si è ipotizzato che 8 thread che eseguono il codice Symfony occupino meno memoria di 8 processi. Questo non è il caso, PHP non è condiviso nulla, tutto il tempo. Ci si può aspettare che 8 thread di Symfony occupino tutta la memoria di 8 processi di Symfony, anzi, un po 'di più. Il vantaggio dell'utilizzo di thread sui processi è che possono comunicare, sincronizzarsi e (sembrare) condividere l'uno con l'altro.

Solo perché è possibile, non significa che dovresti. La soluzione migliore per l'attività in corso è probabilmente quella di utilizzare un pacchetto già pronto o un software destinato a fare ciò che è necessario.

Studiare questa roba così bene da implementare una soluzione robusta è qualcosa che richiederà molto tempo e non vorreste implementare quella prima soluzione ...

Se si decide di ignorare il mio consiglio e provarlo, è possibile trovare molti examples nel repository github per pthreads.

1

Joe ha un buon approccio, ma ho trovato una soluzione diversa altrove che sto usando ora. Fondamentalmente, ho due comandi, un controllo e un comando di lavoro. Il comando di controllo avvia processi in background e controlla i loro risultati:

protected function process($worker, $entity, $timeout=60) { 
    $min = $this->em->createQuery('SELECT MIN(e.id) FROM BM2SiteBundle:'.$entity.' e')->getSingleScalarResult(); 
    $max = $this->em->createQuery('SELECT MAX(e.id) FROM BM2SiteBundle:'.$entity.' e')->getSingleScalarResult(); 

    $batch_size = ceil((($max-$min)+1)/$this->parallel); 
    $pool = array(); 
    for ($i=$min; $i<=$max; $i+=$batch_size) { 
     $builder = new ProcessBuilder(); 
     $builder->setPrefix($this->getApplication()->getKernel()->getRootDir().'/console'); 
     $builder->setArguments(array(
      '--env='.$this->getApplication()->getKernel()->getEnvironment(), 
      'maf:worker:'.$worker, 
      $i, $i+$batch_size-1 
      )); 
     $builder->setTimeout($timeout); 

     $process = $builder->getProcess(); 
     $process->start(); 
     $pool[] = $process; 
    } 
    $this->output->writeln($worker.": started ".count($pool)." jobs"); 
    $running = 99; 
    while ($running > 0) { 
     $running = 0; 
     foreach ($pool as $p) { 
      if ($p->isRunning()) { 
       $running++; 
      } 
     } 
     usleep(250); 
    } 

    foreach ($pool as $p) { 
     if (!$p->isSuccessful()) { 
      $this->output->writeln('fail: '.$p->getExitCode().'/'.$p->getCommandLine()); 
      $this->output->writeln($p->getOutput()); 
     } 
    } 

} 

dove $ this-> parallelo è una variabile ho impostato a 6 sulla mia macchina nucleo 8, significa il numero di processi per iniziare. Si noti che questo metodo richiede di eseguire iterazioni su un'entità specifica (viene divisa in questo modo), che è sempre vera nei casi d'uso.

Non è perfetto, ma avvia processi completamente nuovi anziché thread, che considero la soluzione migliore.

Il comando worker utilizza numeri min e max ID e fa il lavoro effettivo per il set tra questi due.

Questo approccio funziona fintanto che il set di dati è ragionevolmente ben distribuito. Se non si dispone di dati nell'intervallo 1-1000 ma viene utilizzato ogni ID compreso tra 1000 e 2000, i primi tre processi non avrebbero nulla a che fare.

Problemi correlati