Nota, sto usando un intervallo qui per simulare che lo scrittore sia in grado di leggere o meno. Lo si può fare in qualsiasi modo si desidera cioè se lo scrittore restituisce false si dovrebbe aggiornare lo stato di avviare il buffering ecc Credo che l'ultima riga è ciò che si vuole cioè
r.pipe(b).pipe(w);
Questo si legge come segue
readStrem.pipe(transformBbuffer).pipe(writeStream);
Il codice di esempio, ci sono alcune modifiche che possiamo apportare al buffer di tutti i dati. Descriverò dopo il codice. Tutto quello che c'è da sapere su corsi d'acqua e sei più nella documentazione, penso che potevano fare con gli esempi più completi ma sono abbastanza buono come è ...
https://nodejs.org/api/stream.html#stream_class_stream_transform_1
Questo il codice.
var fs = require('fs');
var stream = require('stream')
const util = require('util');
//const StringDecoder = require('string_decoder').StringDecoder;
const Transform = require('stream').Transform;
var check_buff = 0;
var DRAIN_ME = 0;
var r = fs.createReadStream('file1.txt').setEncoding('utf8');
var w = fs.createWriteStream('file2.txt');
var BufferStream = function() {
stream.Transform.apply(this, arguments);
this.buffer = [];
};
util.inherits(BufferStream, stream.Transform);
var intId;
intId = setInterval(function(){
if(check_buff % 3 == 0) {
DRAIN_ME = 1;
return;
}
DRAIN_ME = 0;
},10);
BufferStream.prototype._transform = function (chunk, encoding, done) {
this.buffer.push(String(chunk));
while(DRAIN_ME > 0 && this.buffer.length > 0) {
this.push(this.buffer.shift());
}
console.log(chunk.length);
console.log(this.buffer.length);
done();
};
var b = new BufferStream();
b.on('end', function(chunk) {
clearInterval(intId);
});
r.pipe(b).pipe(w);
Sto cercando il modo canonico di implementare una trasformazione/attraverso flusso, che memorizza tutti i dati fino a quando il tubo è chiamata su di esso.
apportare le seguenti modifiche
BufferStream.prototype._transform = function (chunk, encoding, done) {
this.buffer.push(String(chunk));
console.log(chunk.length);
console.log(this.buffer.length);
done();
};
......
BufferStream.prototype._flush = function (cb) {
var len = this.buffer.length;
for (var i = 0; i < len; i++) {
this.push(this.buffer.shift());
};
cb();
};
Si può anche mettere in pausa il flusso leggibile che in effetti mettere in pausa il flusso scrivibile perché si ferma la ricezione dei dati ad esempio ...
Per testare questo creare un file abbastanza grande su disco cioè 100 MB o più ed esegui questo ...
var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');
readableStream.on('data', function(chunk) {
var ready = 0;
readableStream.pause();
setInterval(function(){
if(ready == 0) {
//console.log('pausing');
readableStream.pause();
ready = 1;
}
else {
//console.log('resuming');
readableStream.resume();
ready = 0;
}
},100);
writableStream.write(chunk);
});
Il motivo della pausa immediata è perché nel momento in cui è scattato l'intervallo 10ms
il file potrebbe essere già stato scritto. Ci sono variazioni su questo cioè ...
var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');
var ready = 0;
setInterval(function(){
if(ready == 0) {
//console.log('pausing');
readableStream.pause();
ready = 1;
}
else {
//console.log('resuming');
readableStream.resume();
ready = 0;
}
},100);
readableStream.on('data', function(chunk) {
writableStream.write(chunk);
readableStream.pause();
});
si presenta come la risposta accettata qui è vicino a quello che sto cercando http://stackoverflow.com/questions/20317759/implementing-a-buffered-transform-stream –