2016-02-01 14 views
19

Non è al 100% chiaro per me come funziona l'operatore RxJs 5 share(), vedere qui lo latest docs. Jsbin per la domanda here.Come funziona l'operatore sharex() di RxJs 5?

Se creo un osservabile con una serie di 0 e 2, ciascun valore separato di un secondo:

var source = Rx.Observable.interval(1000) 
.take(5) 
.do(function (x) { 
    console.log('some side effect'); 
}); 

E se creo due utenti alla osservabile:

source.subscribe((n) => console.log("subscriptor 1 = " + n)); 
source.subscribe((n) => console.log("subscriptor 2 = " + n)); 

ottengo questo nella console:

"some side effect ..." 
"subscriptor 1 = 0" 
"some side effect ..." 
"subscriptor 2 = 0" 
"some side effect ..." 
"subscriptor 1 = 1" 
"some side effect ..." 
"subscriptor 2 = 1" 
"some side effect ..." 
"subscriptor 1 = 2" 
"some side effect ..." 
"subscriptor 2 = 2" 

Ho pensato che ogni sottoscrizione avrebbe sottoscritto lo stesso Osservabile, ma non sembra essere il caso! È come se l'atto di sottoscrizione crea un Osservabile completamente separato!

Ma se l'operatore share() viene aggiunto alla fonte osservabile:

var source = Rx.Observable.interval(1000) 
.take(3) 
.do(function (x) { 
    console.log('some side effect ...'); 
}) 
.share(); 

allora otteniamo questo:

"some side effect ..." 
"subscriptor 1 = 0" 
"subscriptor 2 = 0" 
"some side effect ..." 
"subscriptor 1 = 1" 
"subscriptor 2 = 1" 
"some side effect ..." 
"subscriptor 1 = 2" 
"subscriptor 2 = 2" 

Che è quello che mi sarei aspettato senza la share().

Cosa sta succedendo qui, come funziona l'operatore share()? Ogni abbonamento crea una nuova catena Observable?

risposta

15

Fare attenzione a utilizzare RxJS v5 mentre il collegamento della documentazione sembra essere RxJS v4. Non ricordo le specifiche, ma penso che l'operatore share abbia apportato alcune modifiche, in particolare quando si tratta di completare e riprendere la registrazione, ma non credetemi.

Torna alla tua domanda, come hai mostrato nel tuo studio, le tue aspettative non corrispondono al design della biblioteca. Gli osservabili calcolano pigramente il loro flusso di dati, avviando concretamente il flusso di dati quando un sottoscrittore si iscrive. Quando un secondo sottoscrittore si iscrive allo stesso osservabile, viene avviato un altro nuovo flusso di dati come se fosse il primo sottoscrittore (quindi sì, ogni sottoscrizione crea una nuova catena di osservabili come hai detto tu). Questo è ciò che viene coniato nella terminologia RxJS come osservabile a freddo e questo è il comportamento predefinito per RxJS osservabile. Se si desidera un osservabile che invia i propri dati agli abbonati al momento in cui arrivano i dati, viene coniato un osservabile caldo e un modo per ottenere un osservabile caldo è quello di utilizzare l'operatore share.

È possibile trovare i flussi di sottoscrizione e dati illustrati qui: Hot and Cold observables : are there 'hot' and 'cold' operators? (questo è valido per RxJS v4, ma la maggior parte di esso è valido per v5).

10

parti rende il osservabile "caldo", se sono soddisfatte queste 2 condizioni:

  1. il numero di abbonati> 0
  2. E osservabile non ha completato

Scenario1: numero di abbonati > 0 e osservabile non è completato prima di un nuovo abbonamento

var shared = rx.Observable.interval(5000).take(2).share(); 
var startTime = Date.now(); 
var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`); 
}; 

var observer1 = shared.subscribe(log('observer1')), 
    observer2; 

setTimeout(()=>{ 
    observer2 = shared.subscribe(log('observer2')); 
}, 3000); 

// emission for both observer 1 and observer 2, with the samve value at startTime + 5 seconds 
// another emission for both observers at: startTime + 10 seconds 

Scenario 2: il numero di abbonati è zero prima di una nuova sottoscrizione. Diventa "freddo"

var shared = rx.Observable.interval(5000).take(2).share(); 
    var startTime = Date.now(); 
    var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`); 
}; 

var observer1 = shared.subscribe(log('observer1')), 
    observer2; 

setTimeout(()=>{ 
    observer1.unsubscribe(); 
}, 1000); 

setTimeout(()=>{ 
    observer2 = shared.subscribe(log('observer2')); // number of subscribers is 0 at this time 
}, 3000); 
// observer2's onNext is called at startTime + 8 seconds 
// observer2's onNext is called at startTime + 13 seconds 

Scenario 3: quando osservabile è stato completato prima di un nuovo abbonamento. Diventa "freddo"

var shared = rx.Observable.interval(5000).take(2).share(); 
    var startTime = Date.now(); 
    var log = (x) => (value) => { 
     console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`); 
    }; 

var observer1 = shared.subscribe(log('observer1')), 
    observer2; 

setTimeout(()=>{ 
    observer2 = shared.subscribe(log('observer2')); 
}, 12000); 

// 2 emission for observable 1, at startTime + 5 secs, and at startTime + 10secs 
// 2 emissions for observable 2,at startTime + 12 + 5 secs, and at startTime + 12 + 10secs