2015-10-28 4 views
14

Sto scrivendo un modulo, che è un flusso scrivibile. Voglio implementare l'interfaccia pipe per i miei utenti.Qual è un modo corretto per sospendere il flusso leggibile tramite pipe da quello scrivibile in nodejs?

Se si verifica un errore, è necessario mettere in pausa il flusso leggibile ed emettere un evento di errore. Quindi, l'utente deciderà, se ha problemi con l'errore, dovrebbe essere in grado di riprendere l'elaborazione dei dati.

var writeable = new BackPressureStream(); 
writeable.on('error', function(error){ 
    console.log(error); 
    writeable.resume(); 
}); 

var readable = require('fs').createReadStream('somefile.txt'); 
readable.pipe.(writeable); 

vedo quel nodo ci fornisce readable.pause() metodo, che può essere utilizzato per mettere in pausa flusso leggibile. Ma io non riesco a come posso chiamare da mio modulo flusso scrivibile:

var Writable = require('stream').Writable; 

function BackPressureStream(options) { 
    Writable.call(this, options); 
} 
require('util').inherits(BackPressureStream, Writable); 

BackPressureStream.prototype._write = function(chunk, encoding, done) { 
    done(); 
}; 

BackPressureStream.prototype.resume = function() { 
    this.emit('drain'); 
} 

Come contropressione può essere implementato in un flusso scrivibile?

P.S. È possibile utilizzare gli eventi pipe/unpipe che forniscono un flusso leggibile come parametro. Ma si dice anche che, per i flussi convogliati, l'unica possibilità di mettere in pausa consiste nell'annullare lo stream leggibile da quello scrivibile.

Ho capito bene? Devo decomprimere il mio stream scrivibile finché le chiamate dell'utente non riprendono? E dopo che le chiamate dell'utente riprenderanno, dovrei reindirizzare lo stream leggibile?

+1

interessati ad avviare una taglia per questo? –

+1

ehi, hai trovato una risposta alla tua domanda? –

risposta

0

Fondamentalmente, come ho capito, si sta cercando di mettere la contropressione sullo stream in caso di un evento di errore. Hai un paio di opzioni.

In primo luogo, come già identificato, utilizzare pipe per afferrare un'istanza del flusso di lettura e fare un po 'di gioco di gambe.

Un'altra opzione è quella di creare un flusso scrivibile incarto che fornisce questa funzionalità (cioè ci vuole un WritableStream come input, e nell'attuazione funzioni di flusso, passa i dati insieme al flusso fornito.

Fondamentalmente si finisce con qualcosa come

source stream -> wrapping writable -> writable

offerte

https://nodejs.org/api/stream.html#stream_implementing_a_writable_stream con l'implementazione di un flusso scrivibile.

La chiave fo È possibile che se si verifica un errore nel codice scrivibile sottostante, si imposta un flag sullo stream e la prossima chiamata a write si verifichi, si memorizzerà il buffer nel buffer, si memorizzerà la richiamata e si chiamerà solo. Qualcosa di simile

// ... 
constructor(wrappedWritableStream) { 
    wrappedWritableStream.on('error', this.errorHandler); 
    this.wrappedWritableStream = wrappedWritableStream; 
} 
// ... 
write(chunk, encoding, callback) { 
    if (this.hadError) { 
     // Note: until callback is called, this function won't be called again, so we will have maximum one stored 
     // chunk. 
     this.bufferedChunk = [chunk, encoding, callback]; 
    } else { 
     wrappedWritableStream.write(chunk, encoding, callback); 
    } 
} 
// ... 
errorHandler(err) { 
    console.error(err); 
    this.hadError = err; 
    this.emit(err); 
} 
// ... 
recoverFromError() { 
    if (this.bufferedChunk) { 
     wrappedWritableStream.write(...this.bufferedChunk); 
     this.bufferedChunk = undefined; 
    } 
    this.hadError = false; 
} 

Nota: Si dovrebbe solo bisogno di implementare la funzione write, ma io vi incoraggio a scavare in giro e giocare con le altre funzioni di implementazione.

Vale anche la pena notare che potresti avere qualche problema a scrivere su stream che hanno emesso un evento di errore, ma te lo lascerò come un problema separato da risolvere.

Ecco un altro buon risorsa su backpressuring https://nodejs.org/en/docs/guides/backpressuring-in-streams/

Problemi correlati