5

Sto leggendo un file (300.000 righe) in node.js. Voglio inviare righe in lotti di 5.000 linee a un'altra applicazione (Elasticsearch) per memorizzarle. Quindi, ogni volta che finisco di leggere 5.000 righe, voglio inviarle in massa a Elasticsearch tramite un'API per archiviarle e quindi continuare a leggere il resto del file e inviare ogni 5.000 linee alla rinfusa.Come leggere le righe di un file con node.js o javascript con ritardo, non in comportamento non bloccante?

Se voglio usare Java (o qualsiasi altro linguaggio di blocco come C, C++, Python, ecc) per questo compito, farò qualcosa di simile:

int countLines = 0; 
String bulkString = ""; 
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("filePath.txt"))); 
while ((currentLine = br.readLine()) != null) { 
    countLines++; 
    bulkString += currentLine; 
    if(countLines >= 5000){ 
      //send bulkString to Elasticsearch via APIs 
      countLines = 0; 
      bulkString = ""; 
    } 
} 

Se voglio fare la stessa cosa con node.js, farò:

var countLines = 0; 
var bulkString = ""; 
var instream = fs.createReadStream('filePath.txt'); 
var rl = readline.createInterface(instream, outstream); 
rl.on('line', function(line) { 
    if(countLines >= 5000){ 
      //send bulkString to via APIs 
      client.bulk({ 
      index: 'indexName', 
      type: 'type', 
      body: [bulkString] 
      }, function (error, response) { 
      //task is done 
      }); 
      countLines = 0; 
      bulkString = ""; 
    } 
} 

il problema con node.js è che è non bloccante in modo da non attendere la prima risposta API prima di inviare il seguente lotto di linee. So che questo potrebbe essere un vantaggio per done.js perché non attende l'I/O, ma il problema è che invia troppi dati a Elasticsearch. Pertanto la coda di Elasticsearch si riempirà e genererà eccezioni.

La mia domanda è che come posso fare in modo che node.js attenda la risposta dall'API prima che continui a leggere le righe successive o prima che invii il successivo gruppo di righe a Elasticsearch.

So che posso impostare alcuni parametri in Elasticsearch per aumentare la dimensione della coda, ma sono interessato a bloccare il comportamento di node.js per questo problema. Ho familiarità con il concetto di callback, ma non riesco a pensare a un modo per utilizzare le callback in questo scenario per impedire a node.js di chiamare l'API Elasticsearch in modalità non bloccante.

risposta

4

La risposta di Pierre è corretta. Voglio solo inviare un codice che mostri come possiamo trarre vantaggio dal concetto non bloccante di node.js ma, allo stesso tempo, non sovraccaricare Elasticsearch con troppe richieste contemporaneamente.

Ecco un codice pseudo che è possibile utilizzare per dare il codice flessibilità impostando il limite di dimensione della coda:

var countLines = 0; 
var bulkString = ""; 
var queueSize = 3;//maximum of 3 requests will be sent to the Elasticsearch server 
var batchesAlreadyInQueue = 0; 
var instream = fs.createReadStream('filePath.txt'); 
var rl = readline.createInterface(instream, outstream); 
rl.on('line', function(line) { 
    if(countLines >= 5000){ 
      //send bulkString to via APIs 
      client.bulk({ 
      index: 'indexName', 
      type: 'type', 
      body: [bulkString] 
      }, function (error, response) { 
       //task is done 
       batchesAlreadyInQueue--;//we will decrease a number of requests that are already sent to the Elasticsearch when we hear back from one of the requests 
       rl.resume(); 
      }); 
      if(batchesAlreadyInQueue >= queueSize){ 
       rl.pause(); 
      } 
      countLines = 0; 
      bulkString = ""; 
    } 
} 
2

utilizzare rl.pause() subito dopo il tuo se e rl.resume() dopo il //task is done.

Nota che potresti avere qualche altro evento di linea dopo aver chiamato la pausa.

+0

Grazie, ha lavorato per me. – Soheil

Problemi correlati