Ho riscontrato un problema con un server multi-threaded che sto creando come esercizio accademico, in particolare con l'ottenimento di una connessione per chiuderla con grazia.Fine con grazia di un thread in attesa su una coda di blocco
Ogni connessione è gestita da una classe Session. Questa classe gestisce 2 thread per la connessione, un DownstreamThread e un UpstreamThread.
I blocchi UpstreamThread sul socket client e codifica tutte le stringhe in entrata in messaggi da passare a un altro livello da gestire. Il DownstreamThread blocca su un BlockingQueue in cui vengono inseriti i messaggi per il client. Quando c'è un messaggio sulla coda, il thread Downstream rimuove il messaggio dalla coda, lo trasforma in una stringa e lo invia al client. Nel sistema finale, un livello applicativo agirà sui messaggi in arrivo e sposterà i messaggi in uscita verso il server per l'invio al client appropriato, ma per ora ho solo una semplice applicazione che dorme per un secondo su un messaggio in arrivo prima di riecheggiare come un messaggio in uscita con un timestamp aggiunto.
Il problema che sto avendo è quello di arrestare il tutto con garbo quando il client si disconnette. Il primo problema con cui sto discutendo è una normale disconnessione, in cui il client consente al server di sapere che sta terminando la connessione con un comando QUIT. Lo pseudocodice di base è:
while (!quitting) {
inputString = socket.readLine() // blocks
if (inputString != "QUIT") {
// forward the message upstream
server.acceptMessage (inputString);
} else {
// Do cleanup
quitting = true;
socket.close();
}
}
Il loop principale del thread upstream esamina la stringa di input. Se è QUIT il thread imposta un flag per dire che il client ha terminato le comunicazioni ed esce dal ciclo. Ciò porta al thread upstream che si chiude correttamente.
Il ciclo principale del thread downstream attende i messaggi nel BlockingQueue fino a quando non viene impostato il flag di chiusura della connessione. Quando lo è, anche il thread downstream dovrebbe terminare. Tuttavia, non lo fa, resta solo lì ad aspettare. La sua psuedocodarlo assomiglia a questo:
while (!quitting) {
outputMessage = messageQueue.take(); // blocks
sendMessageToClient (outputMessage);
}
Quando ho provato questo, ho notato che quando il cliente uscire, il filo a monte spento, ma il filo a valle no.
Dopo un po 'di grattacapo, mi sono reso conto che il thread downstream sta ancora bloccando il BlockingQueue in attesa di un messaggio in arrivo che non arriverà mai. Il thread upstream non inoltra il messaggio QUIT oltre la catena.
Come posso chiudere il thread downstream con garbo? La prima idea che mi è venuta in mente è stata impostare un timeout sulla chiamata take(). Non sono troppo entusiasta di questa idea, perché qualunque valore tu selezioni, è destinato a non essere del tutto soddisfacente. O è troppo lungo e un thread di zombi rimane lì per molto tempo prima di spegnersi, oppure è troppo corto e le connessioni che sono inattive per alcuni minuti ma sono ancora valide verranno uccise. Ho pensato di inviare il messaggio QUIT sulla catena, ma questo richiede che effettui un round trip completo sul server, poi sull'applicazione, quindi di nuovo sul server e alla sessione. Anche questa non sembra una soluzione elegante.
Ho dato un'occhiata alla documentazione per Thread.stop(), ma questo è apparentemente deprecato perché non ha mai funzionato correttamente in ogni caso, quindi sembra che non sia davvero un'opzione neanche. Un'altra idea che ho avuto è stata quella di forzare un'eccezione a essere attivata nel thread downstream in qualche modo e lasciarla ripulire nel suo blocco finale, ma questo mi sembra un'idea orribile e impacciata.
Credo che entrambi i thread dovrebbero essere in grado di spegnersi con grazia da soli, ma ho anche il sospetto che se un thread finisce deve anche segnalare all'altro thread di terminare in un modo più proattivo rispetto alla semplice impostazione di un flag per l'altro filo da controllare.Dato che non ho ancora molta esperienza con Java, sono piuttosto fuori dalle idee a questo punto. Se qualcuno ha qualche consiglio, sarebbe molto apprezzato.
Per motivi di completezza, ho incluso il codice reale per la classe Session di seguito, anche se credo che i frammenti pseudocodice di cui sopra coprano le parti rilevanti del problema. La classe completa è di circa 250 linee.
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
import java.util.logging.*;
/**
* Session class
*
* A session manages the individual connection between a client and the server.
* It accepts input from the client and sends output to the client over the
* provided socket.
*
*/
public class Session {
private Socket clientSocket = null;
private Server server = null;
private Integer sessionId = 0;
private DownstreamThread downstream = null;
private UpstreamThread upstream = null;
private boolean sessionEnding = false;
/**
* This thread handles waiting for messages from the server and sending
* them to the client
*/
private class DownstreamThread implements Runnable {
private BlockingQueue<DownstreamMessage> incomingMessages = null;
private OutputStreamWriter streamWriter = null;
private Session outer = null;
@Override
public void run() {
DownstreamMessage message;
Thread.currentThread().setName ("DownstreamThread_" + outer.getId());
try {
// Send connect message
this.sendMessageToClient ("Hello, you are client " + outer.getId());
while (!outer.sessionEnding) {
message = this.incomingMessages.take();
this.sendMessageToClient (message.getPayload());
}
// Send disconnect message
this.sendMessageToClient ("Goodbye, client " + getId());
} catch (InterruptedException | IOException ex) {
Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex);
} finally {
this.terminate();
}
}
/**
* Add a message to the downstream queue
*
* @param message
* @return
* @throws InterruptedException
*/
public DownstreamThread acceptMessage (DownstreamMessage message) throws InterruptedException {
if (!outer.sessionEnding) {
this.incomingMessages.put (message);
}
return this;
}
/**
* Send the given message to the client
*
* @param message
* @throws IOException
*/
private DownstreamThread sendMessageToClient (CharSequence message) throws IOException {
OutputStreamWriter osw;
// Output to client
if (null != (osw = this.getStreamWriter())) {
osw.write ((String) message);
osw.write ("\r\n");
osw.flush();
}
return this;
}
/**
* Perform session cleanup
*
* @return
*/
private DownstreamThread terminate() {
try {
this.streamWriter.close();
} catch (IOException ex) {
Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex);
}
this.streamWriter = null;
return this;
}
/**
* Get an output stream writer, initialize it if it's not active
*
* @return A configured OutputStreamWriter object
* @throws IOException
*/
private OutputStreamWriter getStreamWriter() throws IOException {
if ((null == this.streamWriter)
&& (!outer.sessionEnding)) {
BufferedOutputStream os = new BufferedOutputStream (outer.clientSocket.getOutputStream());
this.streamWriter = new OutputStreamWriter (os, "UTF8");
}
return this.streamWriter;
}
/**
*
* @param outer
*/
public DownstreamThread (Session outer) {
this.outer = outer;
this.incomingMessages = new LinkedBlockingQueue();
System.out.println ("Class " + this.getClass() + " created");
}
}
/**
* This thread handles waiting for client input and sending it upstream
*/
private class UpstreamThread implements Runnable {
private Session outer = null;
@Override
public void run() {
StringBuffer inputBuffer = new StringBuffer();
BufferedReader inReader;
Thread.currentThread().setName ("UpstreamThread_" + outer.getId());
try {
inReader = new BufferedReader (new InputStreamReader (outer.clientSocket.getInputStream(), "UTF8"));
while (!outer.sessionEnding) {
// Read whatever was in the input buffer
inputBuffer.delete (0, inputBuffer.length());
inputBuffer.append (inReader.readLine());
System.out.println ("Input message was: " + inputBuffer);
if (!inputBuffer.toString().equals ("QUIT")) {
// Forward the message up the chain to the Server
outer.server.acceptMessage (new UpstreamMessage (sessionId, inputBuffer.toString()));
} else {
// End the session
outer.sessionEnding = true;
}
}
} catch (IOException | InterruptedException e) {
Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e);
} finally {
outer.terminate();
outer.server.deleteSession (outer.getId());
}
}
/**
* Class constructor
*
* The Core Java volume 1 book said that a constructor such as this
* should be implicitly created, but that doesn't seem to be the case!
*
* @param outer
*/
public UpstreamThread (Session outer) {
this.outer = outer;
System.out.println ("Class " + this.getClass() + " created");
}
}
/**
* Start the session threads
*/
public void run() //throws InterruptedException
{
Thread upThread = new Thread (this.upstream);
Thread downThread = new Thread (this.downstream);
upThread.start();
downThread.start();
}
/**
* Accept a message to send to the client
*
* @param message
* @return
* @throws InterruptedException
*/
public Session acceptMessage (DownstreamMessage message) throws InterruptedException {
this.downstream.acceptMessage (message);
return this;
}
/**
* Accept a message to send to the client
*
* @param message
* @return
* @throws InterruptedException
*/
public Session acceptMessage (String message) throws InterruptedException {
return this.acceptMessage (new DownstreamMessage (this.getId(), message));
}
/**
* Terminate the client connection
*/
private void terminate() {
try {
this.clientSocket.close();
} catch (IOException e) {
Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e);
}
}
/**
* Get this Session's ID
*
* @return The ID of this session
*/
public Integer getId() {
return this.sessionId;
}
/**
* Session constructor
*
* @param owner The Server object that owns this session
* @param sessionId The unique ID this session will be given
* @throws IOException
*/
public Session (Server owner, Socket clientSocket, Integer sessionId) throws IOException {
this.server = owner;
this.clientSocket = clientSocket;
this.sessionId = sessionId;
this.upstream = new UpstreamThread (this);
this.downstream = new DownstreamThread (this);
System.out.println ("Class " + this.getClass() + " created");
System.out.println ("Session ID is " + this.sessionId);
}
}
Io ci ho pensato, ma non sta forzando qualcosa da un'eccezione come parte di arresto normale un po 'di un kludge? – GordonM
No, questo è il modo in cui è progettato per funzionare. Altrimenti, ogni metodo che potrebbe essere interrotto dovrebbe restituire uno speciale valore interrotto. Vedi http://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html per ulteriori informazioni. – Brigham
Non so molto su Java, ma cosa succederà quando chiami 'Thread.interrupt' e non stiamo bloccando il metodo' take'? Penso che l'interruzione non verrà sollevata e dobbiamo controllare 'Thread.isInterrupted', credo. – Zuljin