2014-12-09 17 views
6

Mi chiedevo se posso usare il RxJava biblioteca al fine di aggiungere un po 'di concorrenza nel seguente caso d'uso:Aggiunta di un pool di thread in un RxJava flusso

  • Fetch in sequenza una colonna String da un esistente ResultSet con un costume Observable (qualcosa come ResultSetObservable.create(resultSet))
  • Invocare un servizio Web per ciascuno di questi valori (con un'istanza InvokeWebServiceFunc1<String, Pair<String, Integer>>(), per esempio), al fine di recuperare alcuni statistiques legate alla String (notare che il String nella Pair corrisponde a quello immesso in input)
  • Stampa tutto in un formato CSV (con ExportAsCSVAction1<Pair<String, Integer>>(PrintStream printStream)).

Così qui è quello che ho:

ResultSetObservable.create(resultSet) 
    .map(new InvokeWebServiceFunc1<String, Pair<String, Integer>>()) 
    .subscribe(new ExportAsCSVAction1<Pair<String, Integer>>(System.out)); 

Funziona bene, ma come il servizio Web potrebbe richiedere del tempo per alcuni degli input String, voglio aggiungere un po 'di concorrenza da avendo un filo pool come comportamento per la mappatura (di 10 thread per esempio) ma I è necessario il ExportAsCSVAction0 a essere chiamato nello stesso thread (e in realtà il thread corrente sarebbe perfetto).

Potete aiutarmi per favore? Non riesco a capire se utilizzare lo schema toBlocking().forEach() sia la soluzione giusta qui e non capisco dove usare lo Schedulers.from(fixedThreadPool) (nello observeOn() o nello subscribeOn()).

Grazie per qualsiasi aiuto!

risposta

19

L'ho trovato da solo!

package radium.rx; 

import java.util.List; 
import java.util.Arrays; 
import java.util.Random; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 
import rx.Observable; 
import rx.schedulers.Schedulers; 

public class TryRx { 

    public static Random RANDOM = new Random(); 

    public static void main(String[] arguments) throws Throwable { 
     final List<Integer> inputs = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12); 
     final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2); 

     Iterable<Integer> outputs = Observable.<Integer>from(inputs) 
       .flatMap((Integer input) -> deferHeavyWeightStuff(input).subscribeOn(Schedulers.from(threadPoolExecutor))) 
       .toBlocking() 
      .toIterable(); 

     for (Integer output : outputs) { 
      System.out.println(output); 
     } 

     threadPoolExecutor.shutdown(); 
    } 

    public static void sleepQuietly(int duration, TimeUnit unit) { 
     try { 
      Thread.sleep(unit.toMillis(duration)); 
     } catch (InterruptedException e) { 

     } 
    } 

    public static Observable<Integer> deferHeavyWeightStuff(final int input) { 
     return Observable.defer(() -> Observable.just(doHeavyWeightStuff(input))); 
    } 

    public static int randomInt(int min, int max) { 
     return RANDOM.nextInt((max - min) + 1) + min; 
    } 

    public static int doHeavyWeightStuff(int input) { 
     sleepQuietly(randomInt(1, 5), TimeUnit.SECONDS); 
     int output = (int) Math.pow(input, 2); 
     return output; 
    } 

} 

Cheers!

+0

Grazie mille. Tuttavia, l'output sembra essere in un ordine diverso dall'input. Qualsiasi indizio su come risolverlo. – user2849678

+1

Ci si deve aspettare durante la parallelizzazione. – spydon

Problemi correlati