Sto cercando di utilizzare un CloseableHttpAsyncClient
per leggere da un endpoint, eseguire il marshalling della stringa su un oggetto (utilizzando javax.json) e quindi convertire un array sull'oggetto nei suoi singoli componenti:Converti la stringa in matrice in oggetti osservabili
CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().setDefaultCredentialsProvider(provider).build();
client.start();
Observable<ObservableHttpResponse> observable = ObservableHttp.createRequest(HttpAsyncMethods.createGet(uri), client)
.toObservable();
Observable<JsonArray> shareable = observable.flatMap(response -> response.getContent().map(bb -> {
String stringVal = new String(bb);
StringReader reader = new StringReader(stringVal);
JsonObject jobj = Json.createReader(reader).readObject();
return jobj.getJsonArray("elements");
})).share();
ho bisogno di ottenere l'Array JSON, quindi filtrare gli oggetti dell'array:
Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1"));
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2"));
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3"));
Come faccio a convertire il Observable<JsonArray>
in una ObservableJsonObject>
?
Poiché è asincrono, non posso usare forEach per creare una sorta di array per il buffer dei dati.
UPDATE:
così guardando utilizzando il CloseableHttpAsyncClient non può essere la soluzione migliore per quello che sto cercando di realizzare - mi sono reso conto di questa mattina (sotto la doccia di tutte le cose), che sto cercando di processo i dati in modo asincrono per poi effettuare chiamate asincrone.
Idealmente, effettuare la chiamata a un CloseableHttpClient (sincronizzazione) e passare i dati all'Osservable per il filtraggio sarebbe un approccio più ideale (non ho bisogno della prima chiamata per gestire più di una chiamata http).
CloseableHttpClient client = HttpClientBuilder.create().setDefaultCredentialsProvider(provider).build();
StringBuffer result = new StringBuffer();
try {
HttpGet request = new HttpGet(uri);
HttpResponse response = client.execute(request);
BufferedReader rd = new BufferedReader(
new InputStreamReader(response.getEntity().getContent()));
String line;
while ((line = rd.readLine()) != null) {
result.append(line);
}
} catch(ClientProtocolException cpe) { } catch(IOException ioe) { }
StringReader reader = new StringReader(result.toString());
JsonObject jobj = Json.createReader(reader).readObject();
JsonArray elements = jobj.getJsonArray("elements");
List<JsonObject> objects = elements.getValuesAs(JsonObject.class);
Observable<JsonObject> shareable = Observable.from(objects).share();
Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1"));
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2"));
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3"));
firstStream.subscribe(record -> {
//connect to SOTS/Facebook and store the results
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
secondStream.subscribe(record -> {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
thirdStream.subscribe(record -> {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
È possibile ottenere nei guai con questo perché flatMap richiede ai suoi ingressi per sostenere contropressione e la tua stat Observable.create ement non supporta contropressione. Io suggerirei per quel po 'di utilizzare Observable.create (nuova AbstractOnSubscribe() {...}) che fa offrire sostegno contropressione. –
Per l'Observable.create si potrebbe anche solo costruire una lista e tornare Observable.from (elenco). –
Quindi, utilizzando la soluzione di cui sopra, ho finito per dover smaterializzare l'osservabile in uscita (restituisce Observerable> così da chiudere.Ma usando dematerialize termina il reso Osservabile