2015-02-13 10 views
9

Ho bisogno di filtrare le voci emesse da un osservabile controllando la voce su alcuni servizi web. Il normale operatore observable.filter non è adatto qui, poiché si aspetta che la funzione predicato restituisca il verdetto in modo sincrono, ma in questa situazione il verdetto può essere recuperato solo in modo asincrono.Esiste una versione "asincrona" dell'operatore filtro in RxJs?

Posso eseguire il passaggio con il seguente codice, ma mi chiedevo se ci sia un operatore migliore che posso usare per questo caso.

someObservable.flatmap(function(entry) { 
    return Rx.Observable.fromNodeCallback(someAsynCheckFunc)(entry).map(function(verdict) { 
    return { 
     verdict: verdict, 
     entry: entry 
    }; 
    }); 
}).filter(function(obj) { 
    return obj.verdict === true; 
}).map(function(obj) { 
    return obj.entry; 
}); 
+0

si può ottenere thte esempio di codice a un (non) lavoro di stato, qualcosa di 'copia/siamo paste'-able posso lavorare per aiutarti? –

+0

Non penso esista come operatore nella distribuzione RxJs. Tuttavia, puoi facilmente prendere il codice sopra riportato e trasformarlo nel tuo operatore riutilizzabile 'filterAsync'. – Brandon

risposta

9

Ecco come si sarebbe implementare un tale operatore che utilizza operatori esistenti. C'è un ostacolo che devi pensare. Poiché l'operazione di filtro è asincrona, è possibile che i nuovi elementi arrivino più velocemente di quanto l'operazione di filtro possa elaborarli. Cosa dovrebbe accadere in questo caso? Vuoi eseguire i filtri in sequenza e garantire che l'ordine dei tuoi articoli sia mantenuto? Vuoi eseguire i filtri in parallelo e accetti che i tuoi articoli possano uscire in ordine diverso?

Ecco le 2 versioni del gestore

// runs the filters in parallel (order not guaranteed) 
// predicate should return an Observable 
Rx.Observable.prototype.flatFilter = function (predicate) { 
    return this.flatMap(function (value, index) { 
     return predicate(value, index) 
      .filter(Boolean) // filter falsy values 
      .map(function() { return value; }); 
    }); 
}; 

// runs the filters sequentially (order preserved) 
// predicate should return an Observable 
Rx.Observable.prototype.concatFilter = function (predicate) { 
    return this.concatMap(function (value, index) { 
     return predicate(value, index) 
      .filter(Boolean) // filter falsy values 
      .map(function() { return value; }); 
    }); 
}; 

Usage:

var predicate = Rx.Observable.fromNodeCallback(someAsynCheckFunc); 
someObservable.concatFilter(predicate).subscribe(...); 
+1

La soluzione di Bens funziona, ma penso che Brandon sia più chiaro. Quindi scelgo questo. – xwk

+1

Dolce, grazie. Immagino che questa sia ancora la soluzione più aggiornata? – Pipo

2

Non che io sia a conoscenza. Potresti arrotolare il tuo. Non ho ancora testato questo, ma ecco un'idea:

Observable.prototype.flatFilter = function(predicate) { 
    var self = this; 
    return Observable.create(function(obs) { 
    var disposable = new CompositeDisposable(); 
    disposable.add(self.subscribe(function(x) { 
     disposable.add(predicate(x).subscribe(function(result) { 
     if(result) { 
      obs.onNext(x); 
     } 
     }, obs.onError.bind(obs))); 
    }, obs.onError.bind(obs), obs.onCompleted.bind(obs))); 
    return disposable; 
    }); 
}; 

e si potrebbe usare in questo modo:

someStream.flatFilter(function(x) { 
    return Rx.DOM.get('/isValid?check=' + x); 
}).subscribe(function(x) { 
    console.log(x); 
}); 
+0

lol ... Non ho idea del motivo per cui l'ho chiamato "flatFilter", mi è piaciuto averlo con "flatMap" credo. ;) –

+0

Ben, grazie per la soluzione. Tuttavia, mi sembra che il primo obs.onCompleted.bind (obs) non sia del tutto corretto. Sembra che il completamento dell'osservabile restituito dal predicato (x) non completi correttamente l'osservabile restituito da flatFilter(). Ho sbagliato? – xwk

+0

Sì, quella era una svista. Il primo onCompleted può essere rimosso. Lascerei comunque l'onError lì. –

Problemi correlati