2009-04-17 15 views
24

Avendo due InputStreams in Java, c'è un modo per unirli in modo da terminare con un InputStream che fornisce l'output di entrambi gli stream? Come?Come si uniscono due flussi di input in Java?

+3

Unisci in che modo esattamente? Continua senza interruzioni la lettura da un secondo stream dopo che il primo è stato letto da? Non ho molta familiarità con Java, ma in C# è possibile farlo abbastanza facilmente implementando una classe che eredita da Stream che contiene riferimenti a entrambi i flussi di base e quindi sovrascrive il metodo di lettura. – Noldorin

risposta

37

Come commentato, non è chiaro cosa intendi per unione.

Prendere l'input disponibile "casualmente" da entrambi è complicato da InputStream.available non necessariamente fornendo una risposta utile e il comportamento di blocco dei flussi. Avresti bisogno di due thread per essere letto dagli stream e quindi passare i dati attraverso, per esempio, java.io.Piped(In|Out)putStream (sebbene tali classi abbiano problemi). In alternativa per alcuni tipi di stream può essere possibile utilizzare un'interfaccia diversa, ad esempio i canali non bloccanti java.nio.

Se si desidera l'intero contenuto del primo flusso di input seguito dal secondo: new java.io.SequenceInputStream(s1, s2).

+2

Oh, molto carino, ho appena imparato qualcosa di nuovo. SequenceInputStream è essenzialmente identico al mio CatInputStream, ma utilizza le enumerazioni legacy anziché, ad esempio, una LinkedList. :-) –

+0

Come una modifica alla prima parte della risposta, è difficile da risolvere nel caso generale, ma per casi specifici di FileInputStream (e forse anche socket?) È possibile eseguire l'istanza di/cast e creare un canale al di fuori di esso. (Gli altri flussi possono utilizzare Channels.newChannel per creare un'interfaccia coerente, ma non hanno le qualità non bloccanti richieste.) –

+0

Collections.enumeration è tuo amico. Ho dimenticato una parte della mia prima parte - verrà modificata. –

0

Non che io possa pensare. Probabilmente dovresti leggere il contenuto dei due stream in un byte [] e quindi creare un ByteArrayInputStream da questo.

+0

Upvote per una soluzione semplice, facile da capire e praticabile. Potrebbe non avere il comportamento richiesto se il blocco è importante (o sono enormi). –

4

È possibile scrivere un'implementazione personalizzata InputStream che esegue questa operazione. Esempio:

import java.io.IOException; 
import java.io.InputStream; 
import java.util.Collections; 
import java.util.Deque; 
import java.util.LinkedList; 

public class CatInputStream extends InputStream { 
    private final Deque<InputStream> streams; 

    public CatInputStream(InputStream... streams) { 
     this.streams = new LinkedList<InputStream>(); 
     Collections.addAll(this.streams, streams); 
    } 

    private void nextStream() throws IOException { 
     streams.removeFirst().close(); 
    } 

    @Override 
    public int read() throws IOException { 
     int result = -1; 
     while (!streams.isEmpty() 
       && (result = streams.getFirst().read()) == -1) { 
      nextStream(); 
     } 
     return result; 
    } 

    @Override 
    public int read(byte b[], int off, int len) throws IOException { 
     int result = -1; 
     while (!streams.isEmpty() 
       && (result = streams.getFirst().read(b, off, len)) == -1) { 
      nextStream(); 
     } 
     return result; 
    } 

    @Override 
    public long skip(long n) throws IOException { 
     long skipped = 0L; 
     while (skipped < n && !streams.isEmpty()) { 
      int thisSkip = streams.getFirst().skip(n - skipped); 
      if (thisSkip > 0) 
       skipped += thisSkip; 
      else 
       nextStream(); 
     } 
     return skipped; 
    } 

    @Override 
    public int available() throws IOException { 
     return streams.isEmpty() ? 0 : streams.getFirst().available(); 
    } 

    @Override 
    public void close() throws IOException { 
     while (!streams.isEmpty()) 
      nextStream(); 
    } 
} 

Questo codice non è testato, quindi il tuo chilometraggio può variare.

+0

Questo non fa la stessa cosa di SequenceInputStream, come suggerito da Merzbow? –

+0

Ci scusiamo, ma il tackline ha suggerito SequenceInputStream prima (e io l'ho fatto +1). In SO, vince la prima buona risposta; non si sa mai se le risposte successive sono plagio. Inoltre, leggi il mio commento sulla risposta di tackline per un confronto tra SequenceInputStream e CatInputStream (mi riferisco all'utilizzo di Collections.enumeration). –

+0

Se la persona che ha downvoted la mia risposta lo ha fatto a causa del mio ultimo commento, mi scuso; Dovrei spiegare meglio: se scrivo una risposta che risulta essere un duplicato (o sottoinsieme) di una risposta postata in precedenza, di solito la cancello, sapendo che non riceverò mai punti per essa. Su SO, è davvero "Gun più veloce in Occidente" (cercare il titolo di questa domanda). –

14

java.io.SequenceInputStream potrebbe essere quello che ti serve. Accetta un'enumerazione di flussi e genera i contenuti del primo stream, quindi il secondo e così via finché tutti i flussi non sono vuoti.

0

Ecco un'implementazione MVar specifica per gli array di byte (assicurarsi di aggiungere la propria definizione del pacchetto). Da qui, è banale scrivere un flusso di input su flussi uniti. Posso postare anche questo se richiesto.

import java.nio.ByteBuffer; 

public final class MVar { 

    private static enum State { 
    EMPTY, ONE, MANY 
    } 

    private final Object lock; 

    private State state; 

    private byte b; 

    private ByteBuffer bytes; 
    private int length; 

    public MVar() { 
    lock = new Object(); 
    state = State.EMPTY; 
    } 

    public final void put(byte b) { 
    synchronized (lock) { 
     while (state != State.EMPTY) { 
     try { 
      lock.wait(); 
     } catch (InterruptedException e) {} 
     } 
     this.b = b; 
     state = State.ONE; 
     lock.notifyAll(); 
    } 
    } 

    public final void put(byte[] bytes, int offset, int length) { 
    if (length == 0) { 
     return; 
    } 
    synchronized (lock) { 
     while (state != State.EMPTY) { 
     try { 
      lock.wait(); 
     } catch (InterruptedException e) {} 
     } 
     this.bytes = ByteBuffer.allocateDirect(length); 
     this.bytes.put(bytes, offset, length); 
     this.bytes.position(0); 
     this.length = length; 
     state = State.MANY; 
     lock.notifyAll(); 
    } 
    } 

    public final byte take() { 
    synchronized (lock) { 
     while (state == State.EMPTY) { 
     try { 
      lock.wait(); 
     } catch (InterruptedException e) {} 
     } 
     switch (state) { 
     case ONE: { 
     state = State.EMPTY; 
     byte b = this.b; 
     lock.notifyAll(); 
     return b; 
     } 
     case MANY: { 
     byte b = bytes.get(); 
     state = --length <= 0 ? State.EMPTY : State.MANY; 
     lock.notifyAll(); 
     return b; 
     } 
     default: 
     throw new AssertionError(); 
     } 
    } 
    } 

    public final int take(byte[] bytes, int offset, int length) { 
    if (length == 0) { 
     return 0; 
    } 
    synchronized (lock) { 
     while (state == State.EMPTY) { 
     try { 
      lock.wait(); 
     } catch (InterruptedException e) {} 
     } 
     switch (state) { 
     case ONE: 
     bytes[offset] = b; 
     state = State.EMPTY; 
     lock.notifyAll(); 
     return 1; 
     case MANY: 
     if (this.length > length) { 
      this.bytes.get(bytes, offset, length); 
      this.length = this.length - length; 
      synchronized (lock) { 
      lock.notifyAll(); 
      } 
      return length; 
     } 
     this.bytes.get(bytes, offset, this.length); 
     this.bytes = null; 
     state = State.EMPTY; 
     length = this.length; 
     lock.notifyAll(); 
     return length; 
     default: 
     throw new AssertionError(); 
     } 
    } 
    } 
} 
Problemi correlati