Mi chiedevo se posso usare il RxJava biblioteca al fine di aggiungere un po 'di concorrenza nel seguente caso d'uso:Aggiunta di un pool di thread in un RxJava flusso
- Fetch in sequenza una colonna
String
da un esistenteResultSet
con un costumeObservable
(qualcosa comeResultSetObservable.create(resultSet)
) - Invocare un servizio Web per ciascuno di questi valori (con un'istanza
InvokeWebServiceFunc1<String, Pair<String, Integer>>()
, per esempio), al fine di recuperare alcuni statistiques legate allaString
(notare che ilString
nellaPair
corrisponde a quello immesso in input) - Stampa tutto in un formato CSV (con
ExportAsCSVAction1<Pair<String, Integer>>(PrintStream printStream)
).
Così qui è quello che ho:
ResultSetObservable.create(resultSet)
.map(new InvokeWebServiceFunc1<String, Pair<String, Integer>>())
.subscribe(new ExportAsCSVAction1<Pair<String, Integer>>(System.out));
Funziona bene, ma come il servizio Web potrebbe richiedere del tempo per alcuni degli input String
, voglio aggiungere un po 'di concorrenza da avendo un filo pool come comportamento per la mappatura (di 10 thread per esempio) ma I è necessario il ExportAsCSVAction0
a essere chiamato nello stesso thread (e in realtà il thread corrente sarebbe perfetto).
Potete aiutarmi per favore? Non riesco a capire se utilizzare lo schema toBlocking().forEach()
sia la soluzione giusta qui e non capisco dove usare lo Schedulers.from(fixedThreadPool)
(nello observeOn()
o nello subscribeOn()
).
Grazie per qualsiasi aiuto!
Grazie mille. Tuttavia, l'output sembra essere in un ordine diverso dall'input. Qualsiasi indizio su come risolverlo. – user2849678
Ci si deve aspettare durante la parallelizzazione. – spydon