2015-06-22 8 views
5

Ho un osservabile che emette le stringhe e desidero raggrupparle per primo carattere. E 'facile farlo con groupBy così:RxJava: Come raggruppare un numero arbitrario di elementi utilizzando la mia funzione di confine

Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc"); 

Observable<List<String>> groupedRows = rows.groupBy(new Func1<String, Character>() { 
    public Character call(String row) { 
    return row.charAt(0); 
    } 
}).flatMap(new Func1<GroupedObservable<Character, String>, Observable<List<String>>>() { 
    public Observable<List<String>> call(GroupedObservable<Character, String> group) { 
    return group.toList(); 
    } 
}); 

groupedRows.toBlocking().forEach(new Action1<List<String>>() { 
    public void call(List<String> group) { 
    System.out.println(group); 
    } 
}); 

// Output: 
// [aa, ab, ac] 
// [bb, bc] 
// [cc] 

Ma non è un bene per i miei scopi, perché groupBy completa solo ogni gruppo quando la sorgente emette osservabili onComplete. Quindi, se ho un sacco di righe, saranno completamente raccolte nella memoria e solo nell'ultima riga "arrossate" e scritte in output.

Ho bisogno di qualcosa come l'operatore buffer, ma con la mia funzione che denota il limite di ogni gruppo. Ho implementato in quel modo (sapendo che le righe sono sempre ordinati alphabeticaly):

Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc"); 

ConnectableObservable<String> connectableRows = rows.publish(); 

Observable<String> boundarySelector = connectableRows.filter(new Func1<String, Boolean>() { 
    private char lastChar = 0; 

    public Boolean call(String row) { 
    char currentChar = row.charAt(0); 
    boolean isNewGroup = lastChar != 0 && (currentChar != lastChar); 
    lastChar = currentChar; 
    return isNewGroup; 
    } 
}); 

Observable<List<String>> groupedRows = connectableRows.buffer(boundarySelector); 

connectableRows.connect(); 

groupedRows.toBlocking().forEach(new Action1<List<String>>() { 
    public void call(List<String> group) { 
    System.out.println(group); 
    } 
}); 

// Output: 
// [] 
// [] 
// [] 

Non funziona, perché boundarySelector è "mangiare" le righe, e penso che sia strano perché ho utilizzate in modo specifico ConnectableObservable per indicare che Ho bisogno di due abbonati (boundarySelector e groupedRows) prima che l'rows inizi a emettere.

Curiosamente se ritardo rows di 1 secondi, quindi questo codice funziona.

Quindi la domanda è: come posso raggruppare un numero arbitrario di righe utilizzando la mia funzione di limite?

+0

Perché non si consumano le GroupedObservables reattivo invece di raccogliere i loro valori in elenchi? – akarnokd

+0

Perché ho bisogno di tutti i valori di un singolo gruppo per costruire un singolo oggetto e questo singolo oggetto verrà ulteriormente spinto per l'elaborazione reattiva. Il caso d'uso effettivo è che ho una relazione genitore-figlio in entità DB, faccio l'SQL che unisce queste tabelle, quindi una riga per ogni bambino è emmitata. Quindi voglio raggruppare child dello stesso genitore in una raccolta, creare un genitore con i suoi childs ed emettere questo oggetto genitore. –

+0

"Ho bisogno di tutti i valori di un singolo gruppo per costruire un singolo oggetto" - mi sembra contraddittorio. Se hai bisogno di tutti i valori di ogni gruppo, perché vuoi tagliare i gruppi in parti? A proposito, perché ti connetti prima di iscriverti, l'intera faccenda si precipita e ottieni valori vuoti. Avresti bisogno di una connessione automatica ma questa funzione è attualmente all'esame in RxJava. – akarnokd

risposta

1
Observable<Integer> source = Observable.range(0, 100); 

source 
.groupBy(k -> k/10) 
.publish(groups -> groups 
     .map(g -> Pair.of(g.getKey(), g.takeUntil(groups))) 
     .flatMap(kv -> 
      kv.second 
      .doOnNext(v -> System.out.println(kv.first + " value " + v)) 
      .doOnCompleted(() -> System.out.println(kv.first + " done")) 
     )) 
.subscribe() 
; 
0

trovato un modo per farlo utilizzando buffer:

Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc"); 

ConnectableObservable<String> connectableRows = rows.publish(); 

Observable<String> boundarySelector = connectableRows.filter(new Func1<String, Boolean>() { 
    private char lastChar = 0; 

    public Boolean call(String row) { 
     char currentChar = row.charAt(0); 
     boolean isNewGroup = lastChar != 0 && (currentChar != lastChar); 
     lastChar = currentChar; 
     return isNewGroup; 
    } 
}); 

Observable<List<String>> groupedRows = connectableRows 
     .refCount() 
     .buffer(boundarySelector); 

groupedRows.toBlocking().forEach(new Action1<List<String>>() { 
    public void call(List<String> group) { 
     System.out.println(group); 
    } 
}); 

// Output: 
// [aa, ab, ac] 
// [bb, bc] 
// [cc] 
Problemi correlati