Ho un osservabile che emette le stringhe e desidero raggrupparle per primo carattere. E 'facile farlo con groupBy
così:RxJava: Come raggruppare un numero arbitrario di elementi utilizzando la mia funzione di confine
Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc");
Observable<List<String>> groupedRows = rows.groupBy(new Func1<String, Character>() {
public Character call(String row) {
return row.charAt(0);
}
}).flatMap(new Func1<GroupedObservable<Character, String>, Observable<List<String>>>() {
public Observable<List<String>> call(GroupedObservable<Character, String> group) {
return group.toList();
}
});
groupedRows.toBlocking().forEach(new Action1<List<String>>() {
public void call(List<String> group) {
System.out.println(group);
}
});
// Output:
// [aa, ab, ac]
// [bb, bc]
// [cc]
Ma non è un bene per i miei scopi, perché groupBy
completa solo ogni gruppo quando la sorgente emette osservabili onComplete
. Quindi, se ho un sacco di righe, saranno completamente raccolte nella memoria e solo nell'ultima riga "arrossate" e scritte in output.
Ho bisogno di qualcosa come l'operatore buffer
, ma con la mia funzione che denota il limite di ogni gruppo. Ho implementato in quel modo (sapendo che le righe sono sempre ordinati alphabeticaly):
Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc");
ConnectableObservable<String> connectableRows = rows.publish();
Observable<String> boundarySelector = connectableRows.filter(new Func1<String, Boolean>() {
private char lastChar = 0;
public Boolean call(String row) {
char currentChar = row.charAt(0);
boolean isNewGroup = lastChar != 0 && (currentChar != lastChar);
lastChar = currentChar;
return isNewGroup;
}
});
Observable<List<String>> groupedRows = connectableRows.buffer(boundarySelector);
connectableRows.connect();
groupedRows.toBlocking().forEach(new Action1<List<String>>() {
public void call(List<String> group) {
System.out.println(group);
}
});
// Output:
// []
// []
// []
Non funziona, perché boundarySelector
è "mangiare" le righe, e penso che sia strano perché ho utilizzate in modo specifico ConnectableObservable
per indicare che Ho bisogno di due abbonati (boundarySelector
e groupedRows
) prima che l'rows
inizi a emettere.
Curiosamente se ritardo rows
di 1 secondi, quindi questo codice funziona.
Quindi la domanda è: come posso raggruppare un numero arbitrario di righe utilizzando la mia funzione di limite?
Perché non si consumano le GroupedObservables reattivo invece di raccogliere i loro valori in elenchi? – akarnokd
Perché ho bisogno di tutti i valori di un singolo gruppo per costruire un singolo oggetto e questo singolo oggetto verrà ulteriormente spinto per l'elaborazione reattiva. Il caso d'uso effettivo è che ho una relazione genitore-figlio in entità DB, faccio l'SQL che unisce queste tabelle, quindi una riga per ogni bambino è emmitata. Quindi voglio raggruppare child dello stesso genitore in una raccolta, creare un genitore con i suoi childs ed emettere questo oggetto genitore. –
"Ho bisogno di tutti i valori di un singolo gruppo per costruire un singolo oggetto" - mi sembra contraddittorio. Se hai bisogno di tutti i valori di ogni gruppo, perché vuoi tagliare i gruppi in parti? A proposito, perché ti connetti prima di iscriverti, l'intera faccenda si precipita e ottieni valori vuoti. Avresti bisogno di una connessione automatica ma questa funzione è attualmente all'esame in RxJava. – akarnokd