2016-02-03 11 views
5

Sono nuovo di RxJS e mi chiedevo se qualcuno potesse aiutarmi.Stream sincrono di risposte da un flusso di richieste con RxJS

Desidero creare un flusso sincrono di risposte (preferibilmente con le richieste corrispondenti) da un flusso di richieste (dati di carico utile).

Fondamentalmente voglio che le richieste vengano inviate una alla volta, ognuna in attesa della risposta dall'ultima.

Ho provato questo, ma manda tutto in una volta (jsbin):

var requestStream, responseStream; 
 
requestStream = Rx.Observable.from(['a','b','c','d','e']); 
 

 
responseStream = requestStream.flatMap(
 
    sendRequest, 
 
    (val, response)=>{ return {val, response}; } 
 
); 
 

 
responseStream.subscribe(
 
    item=>{ 
 
    console.log(item); 
 
    }, 
 
    err => { 
 
    console.err(err); 
 
    }, 
 
()=>{ 
 
    console.log('Done'); 
 
    } 
 
); 
 

 
function sendRequest(val) { 
 
    return new Promise((resolve,reject)=>{ 
 
    setTimeout(()=>{resolve('result for '+val);},1000); 
 
    }); 
 
};

i seguenti lavori, in una certa misura, ma non usa flusso per i dati della richiesta (jsbin).

var data, responseStream; 
 
data = ['a','b','c','d','e']; 
 
responseStream = Rx.Observable.create(observer=>{ 
 
    var sendNext = function(){ 
 
    var val = data.shift(); 
 
    if (!val) { 
 
     observer.onCompleted(); 
 
     return; 
 
    } 
 
    sendRequest(val).then(response=>{ 
 
     observer.onNext({val, response}); 
 
     sendNext(); 
 
    }); 
 
    }; 
 
    sendNext(); 
 
}); 
 

 
responseStream.subscribe(
 
    item=>{ 
 
    console.log(item); 
 
    }, 
 
    err => { 
 
    console.err(err); 
 
    }, 
 
()=>{ 
 
    console.log('Done'); 
 
    } 
 
); 
 

 
function sendRequest(val) { 
 
    return new Promise((resolve,reject)=>{ 
 
    setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500); 
 
    }); 
 
};

Grazie!

EDIT:

Giusto per chiarire, questo è quello che volevo ottenere:

"Invia A, quando si riceve la risposta per A, inviare B, quando si riceve la risposta per la B, inviare C, ecc ..."

Utilizzando concatMap e rinviare, come suggerito da user3743222, sembra farlo (jsbin):

responseStream = requestStream.concatMap(
    (val)=>{ 
    return Rx.Observable.defer(()=>{ 
     return sendRequest(val); 
    }); 
    }, 
    (val, response)=>{ return {val, response}; } 
); 

risposta

3

Prova a sostituire flatMap con concatMap nel tuo primo esempio di codice e fammi sapere se il comportamento risultante corrisponde a ciò che stai cercando.

responseStream = requestStream.concatMap(//I replaced `flatMap` 
    sendRequest, 
    (val, response)=>{ return {val, response}; } 
); 

Fondamentalmente concatMap ha una firma simile rispetto flatMap, la differenza di comportamento è che si attenderà l'attuale osservabile essere appiattito per completare prima di procedere con il successivo. Quindi:

  • a requestStream il valore verrà inviato all'operatore concatMap.
  • all'operatore concatMap genererà un sendRequest osservabile, e qualunque valori da quella osservabile (sembra essere una tupla (val, response)) passerà attraverso la funzione di selezione e il risultato oggetto che verrà passato a valle
  • quando che sendRequest completa, viene elaborato un altro valore requestStream.
  • In breve, le vostre richieste saranno trattati uno per uno

In alternativa, forse si vuole utilizzare defer di rinviare l'esecuzione della sendRequest.

responseStream = requestStream.concatMap(//I replaced `flatMap` 
    function(x){return Rx.Observable.defer(function(){return sendRequest(x);})}, 
    (val, response)=>{ return {val, response}; } 
); 
+0

Grazie per la risposta. Ho provato la tua soluzione, ma le richieste sono ancora tutte inviate immediatamente. La documentazione suggerisce che flatMap può causare interleaving mentre concatMap no. Sembra che la differenza sia nell'ordinare. Ha senso usare concatMap ma non produce ancora il comportamento desiderato: Invia A, quando ricevi risposta per A, invia B, quando ricevi risposta per B, invia C, ecc. – jamesref

+0

Forse ho frainteso ciò che volevi. Puoi provare in questo caso "differire"? Aggiornerò il codice – user3743222

+0

Grazie! Sembra che funzioni. – jamesref

Problemi correlati