2013-05-01 8 views
5

Ho riscontrato un problema con un server multi-threaded che sto creando come esercizio accademico, in particolare con l'ottenimento di una connessione per chiuderla con grazia.Fine con grazia di un thread in attesa su una coda di blocco

Ogni connessione è gestita da una classe Session. Questa classe gestisce 2 thread per la connessione, un DownstreamThread e un UpstreamThread.

I blocchi UpstreamThread sul socket client e codifica tutte le stringhe in entrata in messaggi da passare a un altro livello da gestire. Il DownstreamThread blocca su un BlockingQueue in cui vengono inseriti i messaggi per il client. Quando c'è un messaggio sulla coda, il thread Downstream rimuove il messaggio dalla coda, lo trasforma in una stringa e lo invia al client. Nel sistema finale, un livello applicativo agirà sui messaggi in arrivo e sposterà i messaggi in uscita verso il server per l'invio al client appropriato, ma per ora ho solo una semplice applicazione che dorme per un secondo su un messaggio in arrivo prima di riecheggiare come un messaggio in uscita con un timestamp aggiunto.

Il problema che sto avendo è quello di arrestare il tutto con garbo quando il client si disconnette. Il primo problema con cui sto discutendo è una normale disconnessione, in cui il client consente al server di sapere che sta terminando la connessione con un comando QUIT. Lo pseudocodice di base è:

while (!quitting) { 
    inputString = socket.readLine() // blocks 
    if (inputString != "QUIT") { 
     // forward the message upstream 
     server.acceptMessage (inputString); 
    } else { 
     // Do cleanup 
     quitting = true; 
     socket.close(); 
    } 
} 

Il loop principale del thread upstream esamina la stringa di input. Se è QUIT il thread imposta un flag per dire che il client ha terminato le comunicazioni ed esce dal ciclo. Ciò porta al thread upstream che si chiude correttamente.

Il ciclo principale del thread downstream attende i messaggi nel BlockingQueue fino a quando non viene impostato il flag di chiusura della connessione. Quando lo è, anche il thread downstream dovrebbe terminare. Tuttavia, non lo fa, resta solo lì ad aspettare. La sua psuedocodarlo assomiglia a questo:

while (!quitting) { 
    outputMessage = messageQueue.take(); // blocks 
    sendMessageToClient (outputMessage); 
} 

Quando ho provato questo, ho notato che quando il cliente uscire, il filo a monte spento, ma il filo a valle no.

Dopo un po 'di grattacapo, mi sono reso conto che il thread downstream sta ancora bloccando il BlockingQueue in attesa di un messaggio in arrivo che non arriverà mai. Il thread upstream non inoltra il messaggio QUIT oltre la catena.

Come posso chiudere il thread downstream con garbo? La prima idea che mi è venuta in mente è stata impostare un timeout sulla chiamata take(). Non sono troppo entusiasta di questa idea, perché qualunque valore tu selezioni, è destinato a non essere del tutto soddisfacente. O è troppo lungo e un thread di zombi rimane lì per molto tempo prima di spegnersi, oppure è troppo corto e le connessioni che sono inattive per alcuni minuti ma sono ancora valide verranno uccise. Ho pensato di inviare il messaggio QUIT sulla catena, ma questo richiede che effettui un round trip completo sul server, poi sull'applicazione, quindi di nuovo sul server e alla sessione. Anche questa non sembra una soluzione elegante.

Ho dato un'occhiata alla documentazione per Thread.stop(), ma questo è apparentemente deprecato perché non ha mai funzionato correttamente in ogni caso, quindi sembra che non sia davvero un'opzione neanche. Un'altra idea che ho avuto è stata quella di forzare un'eccezione a essere attivata nel thread downstream in qualche modo e lasciarla ripulire nel suo blocco finale, ma questo mi sembra un'idea orribile e impacciata.

Credo che entrambi i thread dovrebbero essere in grado di spegnersi con grazia da soli, ma ho anche il sospetto che se un thread finisce deve anche segnalare all'altro thread di terminare in un modo più proattivo rispetto alla semplice impostazione di un flag per l'altro filo da controllare.Dato che non ho ancora molta esperienza con Java, sono piuttosto fuori dalle idee a questo punto. Se qualcuno ha qualche consiglio, sarebbe molto apprezzato.

Per motivi di completezza, ho incluso il codice reale per la classe Session di seguito, anche se credo che i frammenti pseudocodice di cui sopra coprano le parti rilevanti del problema. La classe completa è di circa 250 linee.

import java.io.*; 
import java.net.*; 
import java.util.concurrent.*; 
import java.util.logging.*; 

/** 
* Session class 
* 
* A session manages the individual connection between a client and the server. 
* It accepts input from the client and sends output to the client over the 
* provided socket. 
* 
*/ 
public class Session { 
    private Socket    clientSocket = null; 
    private Server    server   = null; 
    private Integer    sessionId  = 0; 
    private DownstreamThread downstream  = null; 
    private UpstreamThread  upstream  = null; 
    private boolean    sessionEnding = false; 

    /** 
    * This thread handles waiting for messages from the server and sending 
    * them to the client 
    */ 
    private class DownstreamThread implements Runnable { 
     private BlockingQueue<DownstreamMessage> incomingMessages = null; 
     private OutputStreamWriter     streamWriter  = null; 
     private Session        outer    = null; 

     @Override 
     public void run() { 
      DownstreamMessage message; 
      Thread.currentThread().setName ("DownstreamThread_" + outer.getId()); 

      try { 
       // Send connect message 
       this.sendMessageToClient ("Hello, you are client " + outer.getId()); 

       while (!outer.sessionEnding) { 
        message = this.incomingMessages.take(); 
        this.sendMessageToClient (message.getPayload()); 
       } 

       // Send disconnect message 
       this.sendMessageToClient ("Goodbye, client " + getId()); 

      } catch (InterruptedException | IOException ex) { 
       Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex); 
      } finally { 
       this.terminate(); 
      } 
     } 

     /** 
     * Add a message to the downstream queue 
     * 
     * @param message 
     * @return 
     * @throws InterruptedException 
     */ 
     public DownstreamThread acceptMessage (DownstreamMessage message) throws InterruptedException { 
      if (!outer.sessionEnding) { 
       this.incomingMessages.put (message); 
      } 

      return this; 
     } 

     /** 
     * Send the given message to the client 
     * 
     * @param message 
     * @throws IOException 
     */ 
     private DownstreamThread sendMessageToClient (CharSequence message) throws IOException { 
      OutputStreamWriter osw; 
      // Output to client 
      if (null != (osw = this.getStreamWriter())) { 
       osw.write ((String) message); 
       osw.write ("\r\n"); 
       osw.flush(); 
      } 

      return this; 
     } 

     /** 
     * Perform session cleanup 
     * 
     * @return 
     */ 
     private DownstreamThread terminate() { 
      try { 
       this.streamWriter.close(); 
      } catch (IOException ex) { 
       Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex); 
      } 
      this.streamWriter = null; 

      return this; 
     } 

     /** 
     * Get an output stream writer, initialize it if it's not active 
     * 
     * @return A configured OutputStreamWriter object 
     * @throws IOException 
     */ 
     private OutputStreamWriter getStreamWriter() throws IOException { 
      if ((null == this.streamWriter) 
      && (!outer.sessionEnding)) { 
       BufferedOutputStream os = new BufferedOutputStream (outer.clientSocket.getOutputStream()); 
       this.streamWriter  = new OutputStreamWriter (os, "UTF8"); 
      } 

      return this.streamWriter; 
     } 

     /** 
     * 
     * @param outer 
     */ 
     public DownstreamThread (Session outer) { 
      this.outer    = outer; 
      this.incomingMessages = new LinkedBlockingQueue(); 
      System.out.println ("Class " + this.getClass() + " created"); 
     } 
    } 

    /** 
    * This thread handles waiting for client input and sending it upstream 
    */ 
    private class UpstreamThread implements Runnable { 
     private Session outer = null; 

     @Override 
     public void run() { 
      StringBuffer inputBuffer = new StringBuffer(); 
      BufferedReader inReader; 

      Thread.currentThread().setName ("UpstreamThread_" + outer.getId()); 

      try { 
       inReader = new BufferedReader (new InputStreamReader (outer.clientSocket.getInputStream(), "UTF8")); 

       while (!outer.sessionEnding) { 
        // Read whatever was in the input buffer 
        inputBuffer.delete (0, inputBuffer.length()); 
        inputBuffer.append (inReader.readLine()); 
        System.out.println ("Input message was: " + inputBuffer); 

        if (!inputBuffer.toString().equals ("QUIT")) { 
         // Forward the message up the chain to the Server 
         outer.server.acceptMessage (new UpstreamMessage (sessionId, inputBuffer.toString())); 
        } else { 
         // End the session 
         outer.sessionEnding = true; 
        } 
       } 

      } catch (IOException | InterruptedException e) { 
       Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e); 
      } finally { 
       outer.terminate(); 
       outer.server.deleteSession (outer.getId()); 
      } 
     } 

     /** 
     * Class constructor 
     * 
     * The Core Java volume 1 book said that a constructor such as this 
     * should be implicitly created, but that doesn't seem to be the case! 
     * 
     * @param outer 
     */ 
     public UpstreamThread (Session outer) { 
      this.outer = outer; 
      System.out.println ("Class " + this.getClass() + " created"); 
     } 
    } 

    /** 
    * Start the session threads 
    */ 
    public void run() //throws InterruptedException 
    { 
     Thread upThread  = new Thread (this.upstream); 
     Thread downThread = new Thread (this.downstream); 

     upThread.start(); 
     downThread.start(); 
    } 

    /** 
    * Accept a message to send to the client 
    * 
    * @param message 
    * @return 
    * @throws InterruptedException 
    */ 
    public Session acceptMessage (DownstreamMessage message) throws InterruptedException { 
     this.downstream.acceptMessage (message); 
     return this; 
    } 

    /** 
    * Accept a message to send to the client 
    * 
    * @param message 
    * @return 
    * @throws InterruptedException 
    */ 
    public Session acceptMessage (String message) throws InterruptedException { 
     return this.acceptMessage (new DownstreamMessage (this.getId(), message)); 
    } 

    /** 
    * Terminate the client connection 
    */ 
    private void terminate() { 
     try { 
      this.clientSocket.close(); 
     } catch (IOException e) { 
      Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e); 
     } 
    } 

    /** 
    * Get this Session's ID 
    * 
    * @return The ID of this session 
    */ 
    public Integer getId() { 
     return this.sessionId; 
    } 

    /** 
    * Session constructor 
    * 
    * @param owner The Server object that owns this session 
    * @param sessionId The unique ID this session will be given 
    * @throws IOException 
    */ 
    public Session (Server owner, Socket clientSocket, Integer sessionId) throws IOException { 

     this.server   = owner; 
     this.clientSocket = clientSocket; 
     this.sessionId  = sessionId; 
     this.upstream  = new UpstreamThread (this); 
     this.downstream  = new DownstreamThread (this); 

     System.out.println ("Class " + this.getClass() + " created"); 
     System.out.println ("Session ID is " + this.sessionId); 
    } 
} 

risposta

3

Invece di chiamare Thread.stop uso Thread.interrupt. Ciò causerà il metodo take per generare un valore InterruptedException che è possibile utilizzare per sapere che è necessario arrestare.

+1

Io ci ho pensato, ma non sta forzando qualcosa da un'eccezione come parte di arresto normale un po 'di un kludge? – GordonM

+0

No, questo è il modo in cui è progettato per funzionare. Altrimenti, ogni metodo che potrebbe essere interrotto dovrebbe restituire uno speciale valore interrotto. Vedi http://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html per ulteriori informazioni. – Brigham

+0

Non so molto su Java, ma cosa succederà quando chiami 'Thread.interrupt' e non stiamo bloccando il metodo' take'? Penso che l'interruzione non verrà sollevata e dobbiamo controllare 'Thread.isInterrupted', credo. – Zuljin

0

È sufficiente creare un messaggio di chiusura "falso" invece di impostare outer.sessionEnding su true quando viene visualizzato "ESCI". Mettere questo falso messaggio in coda attiverà il DownstreamThread e potrai terminarlo. In tal caso puoi persino eliminare questa variabile di sessione.

In pseudo codice questo potrebbe essere la seguente:

while (true) { 
    outputMessage = messageQueue.take(); // blocks 
    if (QUIT == outputMessage) 
     break 
    sendMessageToClient (outputMessage); 
} 
Problemi correlati