2015-08-04 11 views
8

Ho cercato di utilizzare uno stream leggibile e uno di trasformazione per elaborare un file molto grande. Il problema che mi sembra di incontrare è che se non metto un flusso scrivibile alla fine, il programma sembra terminare prima che il risultato venga restituito.Node.js Stream leggibili per trasformare

Esempio: rstream.pipe(split()).pipe(tstream)

mio tstream ha un emettitore che emette quando un contatore colpisce una soglia. Quando quella soglia è impostata su un numero basso, ottengo un risultato, ma quando è alto, non restituisce nulla. Se lo instrado a un autore di file, restituisce sempre un risultato. Mi manca qualcosa di ovvio?

codice:

// Dependencies 
var fs = require('fs'); 
var rstream = fs.createReadStream('file'); 
var wstream = fs.createWriteStream('output'); 
var split = require('split'); // used for separating stream by new line 
var QTransformStream = require('./transform'); 

var qtransformstream = new QTransformStream(); 
qtransformstream.on('completed', function(result) { 
    console.log('Result: ' + result); 
}); 
exports.getQ = function getQ(filename, callback) { 

    // THIS WORKS if i have a low counter for qtransformstream, 
    // but when it's high, I do not get a result 
    // rstream.pipe(split()).pipe(qtransformstream); 

    // this always works 
    rstream.pipe(split()).pipe(qtransformstream).pipe(wstream); 

}; 

Ecco il codice per il Qtransformstream

// Dependencies 
var Transform = require('stream').Transform, 
    util = require('util'); 
// Constructor, takes in the Quser as an input 
var TransformStream = function(Quser) { 
    // Create this as a Transform Stream 
    Transform.call(this, { 
     objectMode: true 
    }); 
    // Default the Qbase to 32 as an assumption 
    this.Qbase = 32; 
    if (Quser) { 
     this.Quser = Quser; 
    } else { 
     this.Quser = 20; 
    } 
    this.Qpass = this.Quser + this.Qbase; 
    this.Counter = 0; 
    // Variables used as intermediates 
    this.Qmin = 120; 
    this.Qmax = 0; 
}; 
// Extend the transform object 
util.inherits(TransformStream, Transform); 
// The Transformation to get the Qbase and Qpass 
TransformStream.prototype._transform = function(chunk, encoding, callback) { 
    var Qmin = this.Qmin; 
    var Qmax = this.Qmax; 
    var Qbase = this.Qbase; 
    var Quser = this.Quser; 
    this.Counter++; 
    // Stop the stream after 100 reads and emit the data 
    if (this.Counter === 100) { 
     this.emit('completed', this.Qbase, this.Quser); 
    } 
    // do some calcs on this.Qbase 

    this.push('something not important'); 
    callback(); 
}; 
// export the object 
module.exports = TransformStream; 
+0

Puoi pubblicare il codice per l'implementazione 'QTransformStream'? – mscdex

+0

Quante linee hai nel file di input e qual è il valore massimo del contatore in quel caso. Se il valore del contatore è maggiore dei numeri di riga, l'evento 'completato' non emetterà. Inoltre è necessario premere 'null' per terminare il flusso. Non sei sicuro di cosa hai in "qualcosa di non importante", ma a un certo punto dovrebbe esserci un "nulla". – hassansin

+0

Ci sono meno linee rispetto al contatore, circa 7000 linee. Funziona quando lo canalizzo in un flusso di scrittura. Un flusso di trasformazione deve avere una spinta (null) per funzionare? – ace040686

risposta

6

EDIT:

Inoltre, non so quanto in alto il vostro contatore va, ma se si riempire il buffer smetterà di passare i dati al flusso di trasformazione nel qual caso completed non viene mai effettivamente colpito perché r arrivare al limite del contatore. Prova a cambiare il tuo highwatermark.

EDIT 2: un po 'meglio Spiegazione

Come ben sapete un transform streamè un flusso duplex che in pratica significa che può accettare dati da una sorgente, ed è possibile inviare i dati ad una destinazione. Questo è comunemente indicato come lettura e scrittura rispettivamente. Il transform stream eredita sia da read stream sia da write stream implementato da Node.js. C'è tuttavia un avvertimento, il transform streamnon deve implementare le funzioni _read o _write. In questo senso si può pensare ad esso come il meno conosciuto passthrough stream.

Se si pensa al fatto che lo transform stream implementa lo write stream, è necessario considerare anche il fatto che lo stream di scrittura ha sempre una destinazione in cui scaricare i suoi contenuti. Il problema che si sta verificando con è che quando si crea un transform stream non è possibile specificare un luogo in cui inviare i propri contenuti. L'unico modo per passare completamente i dati attraverso il proprio stream di trasformazione consiste nel reindirizzarli a uno stream di scrittura, altrimenti gli stream vengono sottoposti a backup e non possono accettare più dati, perché non c'è spazio per i dati.

Questo è il motivo per cui quando si esegue il piping su un flusso di scrittura funziona sempre. Il flusso di scrittura sta alleggerendo il backup dei dati inviando i dati a una destinazione, quindi tutti i dati verranno inviati e l'evento di completamento verrà emesso.

Il motivo per cui il codice funziona senza il flusso di scrittura quando la dimensione del campione è piccola è che non si sta riempiendo il flusso, quindi il flusso di trasformazione può accettare dati sufficienti per consentire il raggiungimento dell'evento/soglia completo . Poiché la soglia aumenta la quantità di dati che il tuo stream può accettare senza inviarlo a un altro luogo (un flusso di scrittura) rimane lo stesso. Questo fa sì che il tuo stream venga sottoposto a backup e non può più accettare i dati, il che significa che l'evento completato non verrà mai emesso.

Mi azzarderei a dire che se aumenti il ​​tuo highwatermark per il flusso di trasformazione sarai in grado di aumentare la soglia e continuare a far funzionare il codice. Questo metodo tuttavia non è corretto. Tubo vostro flusso in un flusso di scrittura che invierà i dati al dev/null il modo di creat quel flusso di scrittura è:

var writer = fs.createWriteStream('/dev/null'); 

La sezione nella documentazione Node.JS sul buffering spiegare l'errore si esegue in.

+0

Gli stream nel nodo non sono così semplici come sembrano. Mi piacerebbe vedere una buona spiegazione dettagliata di queste sottigliezze. – thorn

+0

Ho cercato di fare una spiegazione migliore, fammi sapere se ce ne sono parte che non sono chiari. – RadleyMith

1

Non interrompere _trasformare ed elaborare molto lontano. Prova:

this.emit('completed', ...); 
this.end(); 

Ecco perché 'programma sembra terminare prima che il risultato viene restituito'

E non lo fanno in uscita i dati inutili:

var wstream = fs.createWriteStream('/dev/null'); 

Buona fortuna)

1

Suggerirei di utilizzare un Writable anziché un flusso di trasformazione. Quindi rinomina _transform a _write e il tuo codice consumerà il flusso se lo instradi. Un flusso di trasformazione, come già sottolineato da @Bradgnar, ha bisogno di un consumatore o lo stream di stop the readable da spingere più dati nel suo buffer.