2016-01-10 13 views
10

Sono nuovo di zecca per Rx e trovo difficile trovare la documentazione sulla composizione delle promesse in modo tale che i dati dalla prima promessa passino al secondo e così via. Ecco tre promesse di base, i calcoli sui dati non sono importanti, solo che qualcosa di asincrono deve essere fatto usando i dati della promessa precedente.Composizione promessa RxJS (dati di passaggio)

const p1 =() => Promise.resolve(1); 
const p2 = x => { const val = x + 1; return Promise.resolve(val); }; 
const p3 = x => { 
     const isEven = x => x % 2 === 0; 
     return Promise.resolve(isEven(x)); 
}; 

Il modo tradizionale per ottenere la composizione di cui sto parlando:

pl().then(p2).then(p3).then(console.log); 

mio realizzazione preferita è di Ramda composeP e pipeP:

R.pipeP(p1, p2, p3, console.log)() 

Sembra probabile Rx potrebbe essere in grado gestire questo tipo di situazione in modo abbastanza fluido. Tuttavia, la più vicina che ho trovato finora è dal RxJS a asincrona (biblioteca) Confronto qui https://github.com/Reactive-Extensions/RxJS/blob/master/doc/mapping/async/comparing.md:

var Rx = require('rx'), 
    fs = require('fs'), 
    path = require('path'); 
var file = path.join(__dirname, 'file.txt'), 
    dest = path.join(__dirname, 'file1.txt'), 
    exists = Rx.Observable.fromCallback(fs.exists), 
    rename = Rx.Observable.fromNodeCallback(fs.rename), 
    stat = Rx.Observable.fromNodeCallback(fs.stat); 
exists(file) 
    .concatMap(function (flag) { 
    return flag ? 
     rename(file, dest) : 
     Rx.Observable.throw(new Error('File does not exist.')); 
    }) 
    .concatMap(function() { 
     return stat(dest); 
    }) 
    .forEach(
     function (fsStat) { 
      console.log(JSON.stringify(fsStat)); 
     }, 
     function (err) { 
      console.log(err); 
     } 
    ); 

concatMap sembra promettente, ma il codice di cui sopra sembra piuttosto orribile. Avevo anche problemi con il mio esempio perché Rx.Observable.fromPromise (p1) non funzionerà in quanto si aspetta una promessa in sé, non una funzione, e Rx.Observable.defer (p1) non sembra passare parametri come il esempio.

Grazie!

domanda simile, ma senza passaggio di dati: Chaining promises with RxJS

+0

le tue promesse devono essere racchiuse in una funzione? – user3743222

+0

Solo in questo caso se hai definito una promessa inline al di fuori di una catena Promise o osservabile con qualcosa come const p1 = new Promise ((resolve, reject) => {}) inizierà immediatamente la valutazione e non potrebbe ricevere i dati dal precedente promessa eseguita. O sbaglio sulla valutazione immediata? –

risposta

14

non ho letto tutto, ma se si vuole raggiungere lo stesso pl().then(p2).then(p3).then(console.log);, con p essendo funzione promesse di ritorno, si potrebbe fare qualcosa di simile (ad esempio here)

Rx.Observable.fromPromise(p1()) 
      .flatMap(function(p1_result){return p2(p1_result);}) 
      .flatMap(function(p2_result){return p3(p2_result);}) 

o il più simmetrica:

var chainedPromises$ = 
    Rx.Observable.just() 
      .flatMap(p1) 
      .flatMap(p2) 
      .flatMap(p3); 

Ora, se si desidera eseguire richiamata in modo sequenziale attraverso avvolto fromCallback o fromNodeCallback, si potrebbe fare qualcosa di simile:

function rename (flag){ 
    return flag 
      ? rename(file,dest).flatMap(return Rx.Observable.just(dest)) 
      : Rx.Observable.throw(new Error('File does not exist.')); 
} 

Rx.Observable.just(file) 
      .flatMap(exists) 
      .flatMap(rename) 
      .flatMap(stat) 

Quest'ultimo codice è testato, in modo da tenermi aggiornato se funziona. Ultimo commento, questo dovrebbe funzionare se in ogni punto hai solo un valore prodotto (come una promessa). Se dovessi avere più file invece di uno, con flatMap potresti ricevere problemi di ordine (se l'ordine ti riguarda), in tal caso, potresti utilizzare concatMap in sostituzione.

+0

In qualche modo speravo solo un'astrazione leggermente più alta che fosse qualcosa come flatMapAll (p1, p2, p3). Particolarmente utile se una sequenza di promesse viene generata tramite una mappa, ad es. const ps = map ((x) => promisedFsReadFileCurriedSoThatItDoesSomethingWithPreviousFileData (x), ['1.txt', '2.txt', '3.txt']); . Rx.Observable.just() flatMapAll (... ps); (solo pseudo codice). Ma questa è sicuramente una soluzione gestibile e probabilmente c'è un modo per farlo con il mapping di Prometeo o qualcosa del genere. Grazie!Anche lo –

+0

non ha testato il secondo esempio di codice, ma i primi funzionano come un incantesimo –

+0

puoi fare 'flatMapAll' tu stesso. 'flatMapAll :: Rx.Observable -> [a -> a] -> Rx.Observable'. 'flatMapAll = (source, fn_array) -> fn_array.reduce ((acc, fn) -> acc.flatMap (fn), source)'. In js, 'Rx.Observable.prototype.flatMapAll = function (fn_array) {source = this; return ...} ' – user3743222