2014-09-30 11 views
5

Sto cercando di utilizzare RxJS per scrivere uno script per elaborare diverse centinaia di file di registro, ognuno dei quali è di circa 1 GB. Lo scheletro dello script appare comeCome limitare la concorrenza di flatMap?

Rx.Observable.from(arrayOfLogFilePath) 
.flatMap(function(logFilePath){ 
    return Rx.Node.fromReadStream(logFilePath) 
    .filter(filterLogLine) 
}) 
.groupBy(someGroupingFunc) 
.map(someFurtherProcessing) 
.subscribe(...) 

Il codice funziona, a meno di notare che la fase di filtraggio di tutti i file di registro avrà inizio contemporaneamente. Tuttavia, dal punto di vista delle prestazioni IO del file system, è preferibile elaborare un file dopo l'altro (o almeno limitare la concorrenza a pochi file anziché aprire tutte le centinaia di file nello stesso momento). A questo proposito, come posso implementarlo in un "modo funzionale reattivo"?

Avevo pensato all'utilità di pianificazione ma non riuscivo a capire come può essere d'aiuto.

+0

Ho la stessa domanda, ma con Rx.NET. È possibile? http://stackoverflow.com/questions/37345516/limiting-concurrent-requests-using-rx-and-selectmany – SuperJMN

risposta

12

È possibile utilizzare .merge(maxConcurrent) per limitare la concorrenza. Poiché .merge(maxConcurrent) appiattisce un osservabile (osservabile di osservabili) in un osservabile, è necessario sostituire lo .flatMap con .map in modo che l'output sia un metaosservabile ("unflat"), quindi si chiama .merge(maxConcurrent).

Rx.Observable.from(arrayOfLogFilePath) 
.map(function(logFilePath){ 
    return Rx.Node.fromReadStream(logFilePath) 
    .filter(filterLogLine) 
}) 
.merge(2) // 2 concurrent 
.groupBy(someGroupingFunc) 
.map(someFurtherProcessing) 
.subscribe(...) 

Questo codice non è stato testato (dal momento che non hanno accesso all'ambiente di sviluppo di avere), ma questo è il modo di procedere. RxJS non ha molti operatori con parametri di concorrenza, ma puoi fare quasi sempre ciò che ti serve con .merge(maxConcurrent).

+1

Questo è esattamente quello che sto cercando di mettermi al lavoro. Ho un elenco di 500 URL da caricare e non voglio dare il via a tutte le richieste allo stesso tempo. Ho usato map (5) ma non funziona ... Tutte le richieste sono fatte allo stesso tempo. – Roaders

+0

@Roaders hai ottenuto questa soluzione per funzionare? Sto provando lo stesso. Ma tutte le richieste vengono licenziate contemporaneamente. Ho cercato ovunque su google e non ho trovato nulla. – Diego

+0

Se ad esempio si effettua una chiamata http asincrona, è necessario avvolgerla in un Rx.defer() in modo che Rx possa decidere quando viene effettuata la chiamata (e riprovare se fallisce, ad esempio) – Roaders

0

Ho appena risolto un problema simile con RxJs 5, quindi spero che la soluzione possa aiutare gli altri con un problema simile.

// Simulate always processing 2 requests in parallel (when one is finished it starts processing one more), 
 
// retry two times, push error on stream if retry fails. 
 

 
//const Rx = require('rxjs-es6/Rx'); 
 

 
// -- Global variabel just to show that it works. -- 
 
let parallelRequests = 0; 
 
// -------------------------------------------------- 
 

 
function simulateRequest(req) { 
 
    console.log("Request " + req); 
 
    // --- To log retries --- 
 
    var retry = 0; 
 
    // ---------------------- 
 

 
    // Can't retry a promise, need to restart before the promise is made. 
 
    return Rx.Observable.of(req).flatMap(req => new Promise((resolve, reject) => { 
 

 
     var random = Math.floor(Math.random() * 2000); 
 
     // -- To show that it works -- 
 
     if (retry) { 
 
      console.log("Retrying request " + req + " ,retry " + retry); 
 
     } else { 
 

 
      parallelRequests++; 
 
     } 
 
     // --------------------------- 
 
     setTimeout(() => { 
 
      if (random < 900) { 
 
       retry++; 
 
       return reject(req + " !!!FAILED!!!"); 
 
      } 
 

 
      return resolve(req); 
 
     }, random); 
 
    })).retry(2).catch(e => Rx.Observable.of(e)); 
 
} 
 

 
Rx.Observable.range(1, 10) 
 
    .flatMap(e => simulateRequest(e), null, 2) 
 
    // -- To show that it works -- 
 
    .do(() => { 
 
     console.log("ParallelRequests " + parallelRequests); 
 
     parallelRequests--; 
 
    }) 
 
    // --------------------------- 
 
    .subscribe(e => console.log("Response from request " + e), e => console.log("Should not happen, error: " + e), e => console.log("Finished"));
<script src="https://npmcdn.com/@reactivex/[email protected]/dist/global/Rx.umd.js"></script>