2016-04-12 12 views
8

C'è un modo per collegare un flusso leggibile a uno stream scrivibile in Node.js, dove il writable non è ancora pronto per ricevere i dati? In altre parole, mi piacerebbe connettere il leggibile a quello scrivibile, ma voglio inizializzare il file scrivibile, compresa la definizione del metodo di scrittura, in un momento successivo del programma. Forse dobbiamo implementare il metodo di scrittura, ma esiste un modo per mettere in pausa un flusso scrivibile in modo simile in cui è possibile mettere in pausa un flusso leggibile? O forse possiamo usare un flusso intermedio/di trasformazione e bufferizzare i dati lì, prima di reindirizzare i dati a quelli scrivibili!Trasmissione dei dati allo stream scrivibile che non è ancora pronto per ricevere i dati

A titolo di esempio, di solito facciamo:

readable.pipe(transform).pipe(writable); 

ma voglio fare qualcosa di simile:

const tstrm = readable.pipe(transform); 

doSomethingAsync().then(function(){ 

     tstrm.pipe(writable); 

}); 

chiedo solo se questo è possibile e come farlo bene, finora avendo problemi a capire entrambi.

Immagino che sto cercando di bufferizzare i dati in un flusso di trasformazione intermedio, prima che sia collegato/inviato a un flusso scrivibile, e successivamente, una volta connesso, esegue lo streaming dei dati memorizzati prima di ogni nuovo dato. Sembra una cosa ragionevole da fare, non riesco a trovare alcuna informazione su questo.

+0

si presenta come la risposta accettata qui è vicino a quello che sto cercando http://stackoverflow.com/questions/20317759/implementing-a-buffered-transform-stream –

risposta

2

Nota, sto usando un intervallo qui per simulare che lo scrittore sia in grado di leggere o meno. Lo si può fare in qualsiasi modo si desidera cioè se lo scrittore restituisce false si dovrebbe aggiornare lo stato di avviare il buffering ecc Credo che l'ultima riga è ciò che si vuole cioè

r.pipe(b).pipe(w); 

Questo si legge come segue

readStrem.pipe(transformBbuffer).pipe(writeStream); 

Il codice di esempio, ci sono alcune modifiche che possiamo apportare al buffer di tutti i dati. Descriverò dopo il codice. Tutto quello che c'è da sapere su corsi d'acqua e sei più nella documentazione, penso che potevano fare con gli esempi più completi ma sono abbastanza buono come è ...

https://nodejs.org/api/stream.html#stream_class_stream_transform_1

Questo il codice.

var fs  = require('fs'); 
var stream = require('stream') 
const util = require('util'); 
//const StringDecoder = require('string_decoder').StringDecoder; 
const Transform = require('stream').Transform; 
var check_buff = 0; 
var DRAIN_ME = 0; 

var r = fs.createReadStream('file1.txt').setEncoding('utf8'); 
var w = fs.createWriteStream('file2.txt'); 

var BufferStream = function() { 
    stream.Transform.apply(this, arguments); 
    this.buffer = []; 
}; 

util.inherits(BufferStream, stream.Transform); 

var intId; 
intId = setInterval(function(){ 
    if(check_buff % 3 == 0) { 
    DRAIN_ME = 1; 
    return; 
    } 
    DRAIN_ME = 0; 
},10); 

BufferStream.prototype._transform = function (chunk, encoding, done) { 
    this.buffer.push(String(chunk)); 
    while(DRAIN_ME > 0 && this.buffer.length > 0) { 
    this.push(this.buffer.shift()); 
    } 
    console.log(chunk.length); 
    console.log(this.buffer.length); 
    done(); 
}; 

var b = new BufferStream(); 
b.on('end', function(chunk) { 
    clearInterval(intId); 
}); 
r.pipe(b).pipe(w); 

Sto cercando il modo canonico di implementare una trasformazione/attraverso flusso, che memorizza tutti i dati fino a quando il tubo è chiamata su di esso.

apportare le seguenti modifiche

BufferStream.prototype._transform = function (chunk, encoding, done) { 
    this.buffer.push(String(chunk)); 

    console.log(chunk.length); 
    console.log(this.buffer.length); 
    done(); 
}; 
...... 
BufferStream.prototype._flush = function (cb) { 
    var len = this.buffer.length; 
    for (var i = 0; i < len; i++) { 
    this.push(this.buffer.shift()); 
    }; 
    cb(); 
}; 

Si può anche mettere in pausa il flusso leggibile che in effetti mettere in pausa il flusso scrivibile perché si ferma la ricezione dei dati ad esempio ...

Per testare questo creare un file abbastanza grande su disco cioè 100 MB o più ed esegui questo ...

var fs = require('fs'); 
var readableStream = fs.createReadStream('file1.txt'); 
var writableStream = fs.createWriteStream('file2.txt'); 

readableStream.setEncoding('utf8'); 

readableStream.on('data', function(chunk) { 
    var ready = 0; 
    readableStream.pause(); 
    setInterval(function(){ 
    if(ready == 0) { 
     //console.log('pausing'); 
     readableStream.pause(); 
     ready = 1; 
    } 
    else { 
     //console.log('resuming'); 
     readableStream.resume(); 
     ready = 0; 
    } 
    },100); 
    writableStream.write(chunk); 
}); 

Il motivo della pausa immediata è perché nel momento in cui è scattato l'intervallo 10ms il file potrebbe essere già stato scritto. Ci sono variazioni su questo cioè ...

var fs = require('fs'); 
var readableStream = fs.createReadStream('file1.txt'); 
var writableStream = fs.createWriteStream('file2.txt'); 
readableStream.setEncoding('utf8'); 

var ready = 0; 
setInterval(function(){ 
    if(ready == 0) { 
    //console.log('pausing'); 
    readableStream.pause(); 
    ready = 1; 
    } 
    else { 
    //console.log('resuming'); 
    readableStream.resume(); 
    ready = 0; 
    } 
},100); 

readableStream.on('data', function(chunk) { 
    writableStream.write(chunk); 
    readableStream.pause(); 
}); 
+0

grazie sì che posso mettere in pausa il leggibile, non è così difficile, ma che ne è dell'utilizzo di un flusso through/transform per memorizzare i dati mentre attendiamo che il dispositivo scrivibile sia pronto per ricevere i dati? –

Problemi correlati