2016-01-25 25 views
11

Ho un osservatore temporale molto semplice osservabile e voglio avviare/interrompere la trasmissione senza disconnettere gli abbonati (che dovrebbero sedersi e attendere indipendentemente dallo stato osservabile). È possibile, e se sì, come?come può avviare e interrompere un intervallo osservabile in RXJS?

var source = Rx.Observable 
    .interval(500) 
    .timeInterval() 
    .map(function (x) { return x.value + ':' + x.interval; }) 
    .take(10); 

    var subscription = source.subscribe(
    function (x) { 
    $("#result").append('Next: ' + x + ' '); 
    }, 
    function (err) { 
    $("#result").append('Error: ' + err); 
    }, 
    function() { 
    $("#result").append('Completed'); 
    }); 

commento generale: la maggior parte degli esempi che ho visto mostrano come definire osservabili e abbonati. come posso influenzare il comportamento degli oggetti esistenti?

risposta

14

Dipende da quale è la fonte del segnale di arresto/ripresa. Il modo più semplice a cui riesco a pensare è con lo pausable operator, che come dice la documentazione funziona meglio con gli osservabili caldi. Quindi nel seguente codice di esempio, ho rimosso il take(10) (il tuo segnale pausabile ora passa attraverso il soggetto pauser) e ho aggiunto share per trasformare il tuo osservabile in uno caldo.

var pauser = new Rx.Subject(); 
var source = Rx.Observable 
    .interval(500) 
    .timeInterval() 
    .map(function (x) { return x.value + ':' + x.interval; }) 
    .share() 
    .pausable(pauser); 

var subscription = source.subscribe(
    function (x) { 
    $("#result").append('Next: ' + x + ' '); 
    }, 
    function (err) { 
    $("#result").append('Error: ' + err); 
    }, 
    function() { 
    $("#result").append('Completed'); 
}); 

    // To begin the flow 
pauser.onNext(true); // or source.resume(); 

// To pause the flow at any point 
pauser.onNext(false); // or source.pause(); 

Ecco un more sophisticated example che mettere in pausa la sorgente ogni 10 articoli:

// Helper functions 
function emits (who, who_) {return function (x) { 
who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n"); 
};} 

var pauser = new Rx.Subject(); 
var source = Rx.Observable 
    .interval(500) 
    .timeInterval() 
    .map(function (x) { return x.value + ':' + x.interval; }) 
    .share(); 
var pausableSource = source 
    .pausable(pauser); 

source 
    .scan(function (acc, _){return acc+1}, 0) 
    .map(function(counter){return !!(parseInt(counter/10) % 2)}) 
    .do(emits(ta_validation, 'scan')) 
    .subscribe(pauser); 

var subscription = pausableSource.subscribe(
    function (x) { 
    $("#ta_result").append('Next: ' + x + ' '); 
    }, 
    function (err) { 
    $("#ta_result").append('Error: ' + err); 
    }, 
    function() { 
    $("#ta_result").append('Completed'); 
}); 

Si dovrebbe avere ormai la tua risposta alla seconda domanda. Combina gli osservabili che ti vengono forniti con gli operatori RxJS pertinenti per realizzare il tuo caso d'uso. Questo è quello che ho fatto qui.

+1

grazie per la vostra risposta. Ive ha aggiunto un pulsante per mettere in pausa/riprodurre e ho notato che il flusso si sta reimpostando su ogni curriculum quindi non sta facendo esattamente quello che mi serve, che è un fermo/ripresa senza riavvio. –

+0

Conoscete la differenza tra un osservabile caldo e uno freddo. Dai un'occhiata qui: http://stackoverflow.com/questions/32190445/hot-and-cold-observables-are-there-hot-and-cold-operators/34669444#34669444. Se il tuo stream si sta reimpostando, probabilmente significa che stai usando un osservabile a freddo, cioè un osservabile che viene "ripristinato" ogni volta che ha un nuovo utente. L'unico modo per essere sicuri è vedere il tuo codice. Nel mio esempio, ho aggiunto un '.share()' per assicurarmi che lo stream fosse caldo prima di passarlo all'operatore 'pausable'. Altrimenti avrei gli stessi problemi che hai ora. – user3743222

Problemi correlati