2012-04-05 15 views
5

Devo passare i messaggi ai processi CLI PHP tramite stdin da Java. Mi piacerebbe mantenere circa 20 processi PHP in esecuzione in un pool, in modo tale che quando passo un messaggio al pool, invia ogni messaggio a un thread separato, mantenendo una coda di messaggi da consegnare. Mi piacerebbe che questi processi di PHP rimanessero in vita il più a lungo possibile, portandone uno nuovo se si muore. Ho guardato a fare questo con un pool di thread statici, ma sembra più progettato per le attività che eseguono e semplicemente muoiono. Come potrei fare questo, con una semplice interfaccia per passare un messaggio al pool? Dovrò implementare il mio "pool di thread" personalizzato?ThreadPool dei processi CLI

+0

Molto simile a questa domanda: http://stackoverflow.com/questions/2592093/php-thread-pool –

+1

Io v'è alcuna uscita dal PHP tale che si sapere quando è finita l'elaborazione? – Clint

+0

Non verrà mai eseguita l'elaborazione. Se uno muore, ho bisogno di generarne uno nuovo per sostituirlo. Trasmetterò loro i dati in modalità round robin tramite stdin. – Will

risposta

4

Fornisco un codice con questo come penso che renderà le cose più chiare. Fondamentalmente è necessario mantenere un pool di oggetti di processo in giro. Siate premurosi che ognuno di questi processi abbia un input, un output e un flusso di errori che è necessario gestire in qualche modo. Nel mio esempio ho appena reindirizzato l'errore e l'output alla console dei processi principale. È possibile impostare callback e gestori per ottenere l'output del programma PHP, se necessario. Se stai solo elaborando attività e non ti interessa cosa dice PHP, lasciatelo così com'è o reindirizza a un file.

Sto utilizzando la libreria Apache Commons Pool per l'ObjectPool. Non c'è bisogno di reinventarne uno.

Avrai un pool di 20 processi che eseguono il tuo programma PHP. Questo da solo non ti procurerà ciò di cui hai bisogno. Potresti voler elaborare le attività contro tutti e 20 questi processi "allo stesso tempo". Quindi avrai anche bisogno di un ThreadPool che estrarrà un Processo dal tuo ObjectPool.

Devi anche capire che se uccidi, o CTRL-C il tuo processo Java il processo init prenderà il controllo dei tuoi processi php e loro siederanno semplicemente lì. Probabilmente vorrai tenere un registro di tutti i pid dei processi PHP che hai generato e poi pulirli se esegui di nuovo il tuo programma Java.

public class StackOverflow_10037379 { 

    private static Logger sLogger = Logger.getLogger(StackOverflow_10037379.class.getName()); 

    public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> { 

     private String mProcessToRun; 

     public CLIPoolableObjectFactory(String processToRun) { 
      mProcessToRun = processToRun; 
     } 

     @Override 
     public Process makeObject() throws Exception { 
      ProcessBuilder builder = new ProcessBuilder(); 
      builder.redirectError(Redirect.INHERIT); 
      // I am being lazy, but really the InputStream is where 
      // you can get any output of the PHP Process. This setting 
      // will make it output to the current processes console. 
      builder.redirectOutput(Redirect.INHERIT); 
      builder.redirectInput(Redirect.PIPE); 
      builder.command(mProcessToRun); 
      return builder.start(); 
     } 

     @Override 
     public boolean validateObject(Process process) { 
      try { 
       process.exitValue(); 
       return false; 
      } catch (IllegalThreadStateException ex) { 
       return true; 
      } 
     } 

     @Override 
     public void destroyObject(Process process) throws Exception { 
      // If PHP has a way to stop it, do that instead of destroy 
      process.destroy(); 
     } 

     @Override 
     public void passivateObject(Process process) throws Exception { 
      // Should really try to read from the InputStream of the Process 
      // to prevent lock-ups if Rediret.INHERIT is not used. 
     } 
    } 

    public static class CLIWorkItem implements Runnable { 

     private ObjectPool<Process> mPool; 
     private String mWork; 

     public CLIWorkItem(ObjectPool<Process> pool, String work) { 
      mPool = pool; 
      mWork = work; 
     } 

     @Override 
     public void run() { 
      Process workProcess = null; 
      try { 
       workProcess = mPool.borrowObject(); 
       OutputStream os = workProcess.getOutputStream(); 
       os.write(mWork.getBytes(Charset.forName("UTF-8"))); 
       os.flush(); 
       // Because of the INHERIT rule with the output stream 
       // the console stream overwrites itself. REMOVE THIS in production. 
       Thread.sleep(100); 
      } catch (Exception ex) { 
       sLogger.log(Level.SEVERE, null, ex); 
      } finally { 
       if (workProcess != null) { 
        try { 
         // Seriously.. so many exceptions. 
         mPool.returnObject(workProcess); 
        } catch (Exception ex) { 
         sLogger.log(Level.SEVERE, null, ex); 
        } 
       } 
      } 
     } 
    } 

    public static void main(String[] args) throws Exception { 

     // Change the 5 to 20 in your case. 
     // Also change mock_php.exe to /usr/bin/php or wherever. 
     ObjectPool<Process> pool = 
       new GenericObjectPool<>(
       new CLIPoolableObjectFactory("mock_php.exe"), 5);   

     // This will only allow you to queue 100 work items at a time. I would suspect 
     // that if you only want 20 PHP processes running at a time and this queue 
     // filled up you'll need to implement some other strategy as you are doing 
     // more work than PHP can keep up with. You'll need to block at some point 
     // or throw work away. 
     BlockingQueue<Runnable> queue = 
      new ArrayBlockingQueue<>(100, true); 

     ThreadPoolExecutor executor = 
      new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue); 

     // print some stuff out. 
     executor.execute(new CLIWorkItem(pool, "Message 1\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 2\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 3\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 4\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 5\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 6\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 7\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 8\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 9\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 10\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 11\r\n")); 

     executor.shutdown(); 
     executor.awaitTermination(4000, TimeUnit.HOURS); 

     pool.close();   
    } 
} 

uscita di Esecuzione programma:

12172 - Message 2 
10568 - Message 1 
4804 - Message 3 
11916 - Message 4 
11116 - Message 5 
12172 - Message 6 
4804 - Message 7 
10568 - Message 8 
11916 - Message 9 
11116 - Message 10 
12172 - Message 11 

programma di codice di C++ per appena uscita quello che era entrata:

#include <windows.h> 
#include <iostream> 
#include <string> 

int main(int argc, char* argv[]) 
{ 
    DWORD pid = GetCurrentProcessId(); 
    std::string line; 
    while (true) {  
     std::getline (std::cin, line); 
     std::cout << pid << " - " << line << std::endl; 
    } 

    return 0; 
} 

Aggiornamento

Scusate il ritardo. Ecco una versione di JDK 6 per chiunque sia interessato. Dovrai eseguire un thread separato per leggere tutto l'input da InputStream del processo. Ho impostato questo codice per generare un nuovo thread lungo ogni nuovo processo. Quel thread legge sempre dal processo fintanto che è vivo. Invece di eseguire l'output direttamente su un file, l'ho impostato in modo tale che utilizzi il framework di registrazione. In questo modo è possibile impostare una configurazione di registrazione per andare su un file, eseguire il rollover, passare alla console ecc. Senza che sia codificato in modo rigido per accedere a un file.

Si noterà che avvio solo un singolo Gobbler per ogni processo anche se un processo ha stdout e stderr. Reindirizza lo stderr allo stdout solo per semplificare le cose. Apparentemente jdk6 supporta solo questo tipo di reindirizzamento.

public class StackOverflow_10037379_jdk6 { 

    private static Logger sLogger = Logger.getLogger(StackOverflow_10037379_jdk6.class.getName()); 

    // Shamelessy taken from Google and modified. 
    // I don't know who the original Author is. 
    public static class StreamGobbler extends Thread { 

     InputStream is; 
     Logger logger; 
     Level level; 

     StreamGobbler(String logName, Level level, InputStream is) { 
      this.is = is; 
      this.logger = Logger.getLogger(logName); 
      this.level = level; 
     } 

     public void run() { 
      try { 
       InputStreamReader isr = new InputStreamReader(is); 
       BufferedReader br = new BufferedReader(isr); 
       String line = null; 
       while ((line = br.readLine()) != null) { 
        logger.log(level, line); 
       } 
      } catch (IOException ex) { 
       logger.log(Level.SEVERE, "Failed to read from Process.", ex); 
      } 
      logger.log(
        Level.INFO, 
        String.format("Exiting Gobbler for %s.", logger.getName())); 
     } 
    } 

    public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> { 

     private String mProcessToRun; 

     public CLIPoolableObjectFactory(String processToRun) { 
      mProcessToRun = processToRun; 
     } 

     @Override 
     public Process makeObject() throws Exception { 
      ProcessBuilder builder = new ProcessBuilder(); 
      builder.redirectErrorStream(true); 
      builder.command(mProcessToRun); 
      Process process = builder.start(); 
      StreamGobbler loggingGobbler = 
        new StreamGobbler(
        String.format("process.%s", process.hashCode()), 
        Level.INFO, 
        process.getInputStream()); 
      loggingGobbler.start(); 
      return process; 
     } 

     @Override 
     public boolean validateObject(Process process) { 
      try { 
       process.exitValue(); 
       return false; 
      } catch (IllegalThreadStateException ex) { 
       return true; 
      } 
     } 

     @Override 
     public void destroyObject(Process process) throws Exception { 
      // If PHP has a way to stop it, do that instead of destroy 
      process.destroy(); 
     } 

     @Override 
     public void passivateObject(Process process) throws Exception { 
      // Should really try to read from the InputStream of the Process 
      // to prevent lock-ups if Rediret.INHERIT is not used. 
     } 
    } 

    public static class CLIWorkItem implements Runnable { 

     private ObjectPool<Process> mPool; 
     private String mWork; 

     public CLIWorkItem(ObjectPool<Process> pool, String work) { 
      mPool = pool; 
      mWork = work; 
     } 

     @Override 
     public void run() { 
      Process workProcess = null; 
      try { 
       workProcess = mPool.borrowObject(); 
       OutputStream os = workProcess.getOutputStream(); 
       os.write(mWork.getBytes(Charset.forName("UTF-8"))); 
       os.flush(); 
       // Because of the INHERIT rule with the output stream 
       // the console stream overwrites itself. REMOVE THIS in production. 
       Thread.sleep(100); 
      } catch (Exception ex) { 
       sLogger.log(Level.SEVERE, null, ex); 
      } finally { 
       if (workProcess != null) { 
        try { 
         // Seriously.. so many exceptions. 
         mPool.returnObject(workProcess); 
        } catch (Exception ex) { 
         sLogger.log(Level.SEVERE, null, ex); 
        } 
       } 
      } 
     } 
    } 

    public static void main(String[] args) throws Exception { 

     // Change the 5 to 20 in your case. 
     ObjectPool<Process> pool = 
       new GenericObjectPool<Process>(
       new CLIPoolableObjectFactory("mock_php.exe"), 5); 

     BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100, true); 

     ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue); 

     // print some stuff out. 
     executor.execute(new CLIWorkItem(pool, "Message 1\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 2\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 3\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 4\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 5\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 6\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 7\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 8\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 9\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 10\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 11\r\n")); 

     executor.shutdown(); 
     executor.awaitTermination(4000, TimeUnit.HOURS); 

     pool.close(); 
    } 
} 

uscita

Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 9440 - Message 3 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 8776 - Message 2 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 6100 - Message 1 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 10096 - Message 4 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 8868 - Message 5 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 8868 - Message 8 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 6100 - Message 10 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 8776 - Message 9 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 10096 - Message 6 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 9440 - Message 7 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 6100 - Message 11 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.295131993. 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.756434719. 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.332711452. 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.1981440623. 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.1043636732. 
+0

Wow, grazie per la risposta completa! Fare un'implementazione di test basata su questo ora. Lo apprezzo davvero. – Will

+0

Quindi sono su Java6 e non ho Redirect. Come posso impedire allo stdout/stderr del processo di bloccare? Nel mio caso d'uso normale, voglio scrivere su un processo e reindirizzare stdout/stderr per separare i file di log (senza bloccare). – Will

+1

@Will aggiornato con una versione jdk6. –

1

La soluzione migliore è utilizzare le funzioni pcntl per eseguire il fork di un processo, ma la comunicazione tra i processi è difficile. Consiglierei di creare una coda da cui i processi possano leggere, piuttosto che provare a passare messaggi alla riga di comando.

Beanstalk ha diversi client PHP che è possibile utilizzare per gestire la messaggistica tra i processi.

+0

Mi spiace, forse la mia domanda non è chiara - verrà modificata. Questa è una domanda Java. Voglio un threadpool java con processi cli di lunga durata (/ usr/bin/php in questo caso). Devo essere in grado di inviare qualcosa al pool, che verrà quindi scritto su stdin in uno dei processi della CLI. – Will

Problemi correlati