2010-05-06 23 views
12

Possiedo un oggetto Stream che occasionalmente riceve alcuni dati su di esso, ma a intervalli imprevedibili. I messaggi che appaiono sul flusso sono ben definiti e dichiarano in anticipo la dimensione del loro carico utile (la dimensione è un intero di 16 bit contenuto nei primi due byte di ciascun messaggio).Lettura continua da un flusso?

Mi piacerebbe avere una classe StreamWatcher che rileva quando il flusso ha alcuni dati su di esso. Una volta fatto, vorrei che un evento venisse generato in modo che un'istanza StreamProcessor sottoscritta possa elaborare il nuovo messaggio.

Questo può essere fatto con eventi C# senza utilizzare Thread direttamente? Sembra che dovrebbe essere semplice, ma non riesco a ottenere abbastanza la mia testa nel modo giusto per progettare questo.

risposta

0

Sì, questo può essere fatto. Utilizzare il metodo non bloccante Stream.BeginRead con un AsyncCallback. Il callback è chiamato in modo asincrono quando i dati diventano disponibili. Nella richiamata, chiamare Stream.EndRead per ottenere i dati e chiamare di nuovo Stream.BeginRead per ottenere il prossimo blocco di dati. Buffer dati in entrata in un array di byte che è abbastanza grande da contenere il messaggio. Quando l'array di byte è pieno (potrebbero essere necessarie più chiamate di richiamata), sollevare l'evento. Quindi leggi la dimensione del messaggio successivo, crea un nuovo buffer, ripeti, fatto.

+0

Ha detto senza usare fili !!! =) – Nayan

+0

@Nayan: Asynchrony * è * multithreading. Questo problema è intrinsecamente uno del multithreading. Sospetto che l'OP significhi che non vuole creare i thread in modo esplicito. –

+0

@Nayan: Presumo che l'OP cerchi una soluzione che non crei esplicitamente una nuova discussione e utilizzi il metodo di lettura bloccante, ma per una soluzione asincrona non bloccante. Non puoi avere asincronia senza fili. – dtb

1

L'approccio normale consiste nell'utilizzare lo .NET asynchronous programming pattern esposto da Stream. In sostanza, si inizia a leggere in modo asincrono chiamando Stream.BeginRead, passandogli un buffer byte[] e un metodo di richiamata che verrà richiamato quando i dati sono stati letti dallo stream. Nel metodo di callback, si chiama Stream.EndRead, passandogli l'argomento IAsncResult assegnato alla richiamata. Il valore di ritorno di EndRead indica quanti byte sono stati letti nel buffer.

Dopo aver ricevuto i primi pochi byte in questo modo, è possibile attendere il resto del messaggio (se non si sono ottenuti abbastanza dati la prima volta) chiamando di nuovo BeginRead. Una volta ottenuto l'intero messaggio, puoi aumentare l'evento.

11

Quando non dici utilizzare le discussioni direttamente, presumo si vuole ancora li indirettamente utilizzare tramite le chiamate asincrone, altrimenti questo sarebbe non essere molto utile.

Tutto ciò che dovete fare è racchiudere i metodi asincroni dello Stream e memorizzare il risultato in un buffer. In primo luogo, definiamo la parte all'evento della specifica:

public delegate void MessageAvailableEventHandler(object sender, 
    MessageAvailableEventArgs e); 

public class MessageAvailableEventArgs : EventArgs 
{ 
    public MessageAvailableEventArgs(int messageSize) : base() 
    { 
     this.MessageSize = messageSize; 
    } 

    public int MessageSize { get; private set; } 
} 

Ora, leggere un intero a 16 bit dal flusso in modo asincrono e riferire quando è pronto:

public class StreamWatcher 
{ 
    private readonly Stream stream; 

    private byte[] sizeBuffer = new byte[2]; 

    public StreamWatcher(Stream stream) 
    { 
     if (stream == null) 
      throw new ArgumentNullException("stream"); 
     this.stream = stream; 
     WatchNext(); 
    } 

    protected void OnMessageAvailable(MessageAvailableEventArgs e) 
    { 
     var handler = MessageAvailable; 
     if (handler != null) 
      handler(this, e); 
    } 

    protected void WatchNext() 
    { 
     stream.BeginRead(sizeBuffer, 0, 2, new AsyncCallback(ReadCallback), 
      null); 
    } 

    private void ReadCallback(IAsyncResult ar) 
    { 
     int bytesRead = stream.EndRead(ar); 
     if (bytesRead != 2) 
      throw new InvalidOperationException("Invalid message header."); 
     int messageSize = sizeBuffer[1] << 8 + sizeBuffer[0]; 
     OnMessageAvailable(new MessageAvailableEventArgs(messageSize)); 
     WatchNext(); 
    } 

    public event MessageAvailableEventHandler MessageAvailable; 
} 

penso che su di esso. Ciò presuppone che qualsiasi classe stia gestendo il messaggio abbia anche accesso allo Stream ed è pronto a leggerlo, in modo sincrono o asincrono, in base alla dimensione del messaggio nell'evento. Se vuoi che la classe watcher legga effettivamente l'intero messaggio, dovrai aggiungere altro codice per farlo.

+0

+1 per l'implementazione che dimostra che la mia teoria di BeginRead non è la soluzione giusta. La tua implementazione ha chiarito questo dubbio. – Nayan

+0

Glorioso! Sembra che farà il trucco, e lo proverò domani. –

+0

+1. Nitpicking: AFAIK a Stream.Read è consentito restituire meno dei byte di conteggio, purché restituisca almeno un byte (se non viene raggiunta la fine). Quindi se '(bytesRead! = 2)' non dovresti lanciare un'eccezione ma BeginRead di nuovo fino a quando non sono stati letti due byte. – dtb

1

Non si utilizza Stream.BeginRead() come se si stesse utilizzando il metodo sincrono Stream.Read() in un thread separato?