2016-03-02 11 views
6

Sto cercando di utilizzare RxJS per un semplice sondaggio breve. È necessario effettuare una richiesta ogni delay secondi sulla posizione path sul server, terminando una volta che una delle due condizioni è stata raggiunta: o la callback isComplete(data) restituisce true o ha provato il server più di maxTries. Ecco il codice di base:RxJS 5.0 meccanismo "do while"

newShortPoll(path, maxTries, delay, isComplete) { 
    return Observable.interval(delay) 
    .take(maxTries) 
    .flatMap((tryNumber) => http.get(path)) 
    .doWhile((data) => !isComplete(data)); 
    } 

Tuttavia, DoWhile non esiste in RxJS 5.0, in modo che la condizione in cui si può solo provare il server maxTries opere, grazie alla chiamata take(), ma la condizione isComplete non lo fa lavoro. Come posso fare in modo che i valori osservabili next() finchè isComplete non restituisce true, a quel punto sarà next() quel valore e complete().

Devo notare che takeWhile() non funziona per me qui. Non restituisce l'ultimo valore, che in realtà è il più importante, poiché è quando sappiamo che è stato fatto.

Grazie!

+0

Possibilmente un duplicato di: http://stackoverflow.com/questions/36007911/rxjs-poll-until-interval-done-or-correct-data-received –

+0

Non è un duplicare nel senso che la domanda è chiedere una sostituzione di 'doWhile'. –

risposta

0

Possiamo creare una funzione di utilità per creare un secondo osservabile che emetta ogni elemento emesso dall'osservabile interno; tuttavia, chiameremo la funzione onCompleted una volta la nostra condizione è soddisfatta:

function takeUntilInclusive(inner$, predicate) { 
    return Rx.Observable.create(observer => { 
     var subscription = inner$.subscribe(item => { 
      observer.onNext(item); 

      if (predicate(item)) { 
       observer.onCompleted(); 
      } 
     }, observer.onError, observer.onCompleted); 


     return() => { 
      subscription.dispose(); 
     } 
    }); 
} 

Ed ecco una rapida snippet utilizzando il nuovo metodo di utilità:

const inner$ = Rx.Observable.range(0, 4); 
const data$ = takeUntilInclusive(inner$, (x) => x > 2); 
data$.subscribe(x => console.log(x)); 

// >> 0 
// >> 1 
// >> 2 
// >> 3 

Questa risposta è basata fuori: RX Observable.TakeWhile checks condition BEFORE each element but I need to perform the check after

+0

Stranamente, questa soluzione sembra funzionare circa la metà delle volte. A volte il gruppo finale di dati arriva agli abbonati, a volte semplicemente no. Qualche idea del perché? Passare a switchMap non lo ha risolto. –

0

È possibile ottenere ciò utilizzando gli operatori retry e first.

// helper observable that can return incomplete/complete data or fail. 
 
var server = Rx.Observable.create(function (observer) { 
 
    var x = Math.random(); 
 

 
    if(x < 0.1) { 
 
    observer.next(true); 
 
    } else if (x < 0.5) { 
 
    observer.error("error"); 
 
    } else { 
 
    observer.next(false); 
 
    } 
 
    observer.complete(); 
 

 
    return function() { 
 
    }; 
 
}); 
 
    
 
function isComplete(data) { 
 
    return data; 
 
} 
 
    
 
var delay = 1000; 
 
Rx.Observable.interval(delay) 
 
    .switchMap(() => { 
 
    return server 
 
     .do((data) => { 
 
     console.log('Server returned ' + data); 
 
     },() => { 
 
     console.log('Server threw'); 
 
     }) 
 
     .retry(3); 
 
    }) 
 
    .first((data) => isComplete(data)) 
 
    .subscribe(() => { 
 
    console.log('Got completed value'); 
 
    },() => { 
 
    console.log('Got error'); 
 
    });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>