2011-05-11 10 views
6

Sto implementando un layer orientato agli eventi su Java Sockets e mi chiedevo se ci fosse un modo per determinare se ci sono dati in sospeso da leggere.Java networking: evented Socket/InputStream

Il mio approccio normale sarebbe quello di leggere dal socket in un buffer e chiamare i callback forniti quando il buffer viene riempito su una determinata quantità di byte (che potrebbe essere 0, se il callback deve essere attivato ogni volta qualcosa arriva), ma sospetto che Java stia già facendo il buffering per me.

Il metodo available() di InputStream è affidabile per questo? Dovrei solo read() e fare il mio buffering sulla parte superiore del socket? oppure c'è un'altro modo?

risposta

8

In breve, no. available() non è affidabile (almeno non era per me). Raccomando di utilizzare java.nio.channels.SocketChannel collegato a Selector e SelectionKey. Questa soluzione è in qualche modo basata sugli eventi, ma è più complicata delle semplici prese.

Per clienti:

  1. canale presa Costrutto (socket), aprire un selettore (selector = Selector.open();).
  2. Usa non-blocking socket.configureBlocking(false);
  3. selettore Registrati per connessioni socket.register(selector, SelectionKey.OP_CONNECT);
  4. Collegare socket.connect(new InetSocketAddress(host, port));
  5. Vedere se c'è qualcosa di nuovo selector.select();
  6. Se il "nuovo" si riferisce alla connessione, registrare il selettore per OP_READ; se il "nuovo" si riferisce ai dati disponibili, basta leggere dal socket.

Tuttavia, per averlo in modo asincrono è necessario impostare un thread separato (nonostante il socket venga creato come non bloccato, il thread si bloccherà comunque) che controlla se qualcosa è arrivato o meno.

Per i server, c'è ServerSocketChannel e si utilizza OP_ACCEPT per questo.

Per avere un riferimento, questo è il mio codice (client), dovrebbe dare un suggerimento:

private Thread readingThread = new ListeningThread(); 

/** 
    * Listening thread - reads messages in a separate thread so the application does not get blocked. 
    */ 
private class ListeningThread extends Thread { 
    public void run() { 
    running = true; 
    try { 
    while(!close) listen(); 
    messenger.close(); 
    } 
    catch(ConnectException ce) { 
    doNotifyConnectionFailed(ce); 
    } 
    catch(Exception e) { 
// e.printStackTrace(); 
    messenger.close(); 
    } 
    running = false; 
    } 
} 

/** 
    * Connects to host and port. 
    * @param host Host to connect to. 
    * @param port Port of the host machine to connect to. 
    */ 
public void connect(String host, int port) { 
    try { 
    SocketChannel socket = SocketChannel.open(); 
    socket.configureBlocking(false); 
    socket.register(this.selector, SelectionKey.OP_CONNECT); 
    socket.connect(new InetSocketAddress(host, port)); 
    } 
    catch(IOException e) { 
    this.doNotifyConnectionFailed(e); 
    } 
} 

/** 
    * Waits for an event to happen, processes it and then returns. 
    * @throws IOException when something goes wrong. 
    */ 
protected void listen() throws IOException { 
    // see if there are any new things going on 
    this.selector.select(); 
    // process events 
    Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 
    while(iter.hasNext()) { 
    SelectionKey key = iter.next(); 
    iter.remove(); 
    // check validity 
    if(key.isValid()) { 
    // if connectable... 
    if(key.isConnectable()) { 
    // ...establish connection, make messenger, and notify everyone 
    SocketChannel client = (SocketChannel)key.channel(); 
    // now this is tricky, registering for OP_READ earlier causes the selector not to wait for incoming bytes, which results in 100% cpu usage very, very fast 
    if(client!=null && client.finishConnect()) { 
     client.register(this.selector, SelectionKey.OP_READ); 
    } 
    } 
    // if readable, tell messenger to read bytes 
    else if(key.isReadable() && (SocketChannel)key.channel()==this.messenger.getSocket()) { 
    // read message here 
    } 
    } 
    } 
} 

/** 
    * Starts the client. 
    */ 
public void start() { 
    // start a reading thread 
    if(!this.running) { 
    this.readingThread = new ListeningThread(); 
    this.readingThread.start(); 
    } 
} 

/** 
    * Tells the client to close at nearest possible moment. 
    */ 
public void close() { 
    this.close = true; 
} 

E per server:

/** 
    * Constructs a server. 
    * @param port Port to listen to. 
    * @param protocol Protocol of messages. 
    * @throws IOException when something goes wrong. 
    */ 
public ChannelMessageServer(int port) throws IOException { 
    this.server = ServerSocketChannel.open(); 
    this.server.configureBlocking(false); 
    this.server.socket().bind(new InetSocketAddress(port)); 
    this.server.register(this.selector, SelectionKey.OP_ACCEPT); 
} 

/** 
    * Waits for event, then exits. 
    * @throws IOException when something goes wrong. 
    */ 
protected void listen() throws IOException { 
    // see if there are any new things going on 
    this.selector.select(); 
    // process events 
    Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 
    while(iter.hasNext()) { 
    SelectionKey key = iter.next(); 
    // do something with the connected socket 
    iter.remove(); 
    if(key.isValid()) this.process(key); 
    } 
} 

/** 
    * Processes a selection key. 
    * @param key SelectionKey. 
    * @throws IOException when something is wrong. 
    */ 
protected void process(SelectionKey key) throws IOException { 
    // if incoming connection 
    if(key.isAcceptable()) { 
    // get client 
    SocketChannel client = (((ServerSocketChannel)key.channel()).accept()); 
    try { 
    client.configureBlocking(false); 
    client.register(this.selector, SelectionKey.OP_READ); 
    } 
    catch(Exception e) { 
    // catch 
    } 
    } 
    // if readable, tell messenger to read 
    else if(key.isReadable()) { 
    // read 
    } 
} 

Spero che questo aiuti.

+0

Non capisco. Non hai bisogno di un thread separato.I socket non bloccanti non vengono bloccati per definizione. Utilizzare correttamente OP_READ e un ciclo di lettura corretto che si arresta quando read restituisce zero. – EJP

+0

@EJP: non in disaccordo; tuttavia mi sembrava che, a prescindere dal blocco, la lettura dalla presa continuasse a bloccarsi, anche se non c'era nulla da leggere. Potrebbe essere che ho fatto qualcosa di sbagliato, però. Suggerisco al richiedente di provare come dici tu, e nel caso in cui non funzioni, prova i thread. – Sorrow

+0

quello che quasi certamente ha fatto è stato il ciclo mentre read() ha restituito zero. Ecco perché l'ho menzionato. Questo non sta bloccando, è un loop. – EJP

0

disponibile() vi dirà solo se è possibile leggere i dati senza passare al sistema operativo. Non è molto utile qui.

È possibile eseguire una lettura bloccante o non bloccante come si preferisce. Una lettura non bloccante ritorna solo quando non ci sono dati da leggere, quindi potrebbe essere quello che vuoi.

+2

errato. available() ti dirà la somma dei dati in BufferedInputStream/BufferedReader, se ne stai usando uno, e il socket riceve il buffer, che è una struttura dati del kernel. Se i dati sono solo nel buffer di ricezione del socket, dovresti "andare al sistema operativo" per ottenerlo, ma non lo * bloccherai * nel processo. Come dice il Javadoc. Tuttavia, se è ad esempio un SSLSocket, available() restituisce sempre zero. – EJP

Problemi correlati