2015-07-21 15 views
5

Sto cercando di utilizzare un CloseableHttpAsyncClient per leggere da un endpoint, eseguire il marshalling della stringa su un oggetto (utilizzando javax.json) e quindi convertire un array sull'oggetto nei suoi singoli componenti:Converti la stringa in matrice in oggetti osservabili

CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().setDefaultCredentialsProvider(provider).build(); 

client.start(); 

Observable<ObservableHttpResponse> observable = ObservableHttp.createRequest(HttpAsyncMethods.createGet(uri), client) 
      .toObservable(); 

Observable<JsonArray> shareable = observable.flatMap(response -> response.getContent().map(bb -> { 
     String stringVal = new String(bb); 
     StringReader reader = new StringReader(stringVal); 
     JsonObject jobj = Json.createReader(reader).readObject(); 
     return jobj.getJsonArray("elements"); 
    })).share(); 

ho bisogno di ottenere l'Array JSON, quindi filtrare gli oggetti dell'array:

Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1")); 
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2")); 
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3")); 

Come faccio a convertire il Observable<JsonArray> in una ObservableJsonObject>?

Poiché è asincrono, non posso usare forEach per creare una sorta di array per il buffer dei dati.

UPDATE:

così guardando utilizzando il CloseableHttpAsyncClient non può essere la soluzione migliore per quello che sto cercando di realizzare - mi sono reso conto di questa mattina (sotto la doccia di tutte le cose), che sto cercando di processo i dati in modo asincrono per poi effettuare chiamate asincrone.

Idealmente, effettuare la chiamata a un CloseableHttpClient (sincronizzazione) e passare i dati all'Osservable per il filtraggio sarebbe un approccio più ideale (non ho bisogno della prima chiamata per gestire più di una chiamata http).

CloseableHttpClient client = HttpClientBuilder.create().setDefaultCredentialsProvider(provider).build(); 

    StringBuffer result = new StringBuffer(); 

    try { 
     HttpGet request = new HttpGet(uri); 
     HttpResponse response = client.execute(request); 

     BufferedReader rd = new BufferedReader(
       new InputStreamReader(response.getEntity().getContent())); 

     String line; 
     while ((line = rd.readLine()) != null) { 
      result.append(line); 
     } 
    } catch(ClientProtocolException cpe) { } catch(IOException ioe) { } 

    StringReader reader = new StringReader(result.toString()); 
    JsonObject jobj = Json.createReader(reader).readObject(); 
    JsonArray elements = jobj.getJsonArray("elements"); 

    List<JsonObject> objects = elements.getValuesAs(JsonObject.class); 


    Observable<JsonObject> shareable = Observable.from(objects).share(); 

    Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1")); 
    Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2")); 
    Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3")); 


    firstStream.subscribe(record -> { 
     //connect to SOTS/Facebook and store the results 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     Json.createWriter(baos).writeObject(record); 
     System.out.println(baos.toString()); 
    }); 

    secondStream.subscribe(record -> { 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     Json.createWriter(baos).writeObject(record); 
     System.out.println(baos.toString()); 
    }); 

    thirdStream.subscribe(record -> { 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     Json.createWriter(baos).writeObject(record); 
     System.out.println(baos.toString()); 
    }); 

risposta

2

provare questo codice:

String myjson = "{\"elements\": [{\"text\":\"Obj1\"},{\"text\":\"Obj2\"}, {\"text\":\"Obj3\"}]}"; 

    Observable.just(myjson) 
      .map(jsonStr -> new StringReader(myjson)) 
      .map(reader -> Json.createReader(reader).readObject()) 
      .map(jobj -> jobj.getJsonArray("elements")) 
      .map(elements -> elements.toArray(new JsonObject[elements.size()])) 
      .flatMap(jsonObjects -> Observable.from(jsonObjects)) 
      .subscribe(
        (jsonObject) -> System.out.println(jsonObject.getString("text")), 
        throwable -> throwable.printStackTrace(), 
        () -> System.out.println("On complete")); 

Risultato:

07-22 12: 19: 22,362 8032-8032/com.mediamanagment.app I/System.out: Obj1
07-22 12: 19: 22,362 8032-8032/com.mediamanagment.app I/System.out: Obj2
07-22 12: 19: 22,362 8032-8032/com .mediamanagment.app I/System.out: obj3

NOTA:
Si dovrebbe usare questa dipendenza:

compile 'org.glassfish:javax.json:1.0.4' 

Invece questo:

compile 'javax.json:javax.json-api:1.0' 

Se si intende utilizzare 'javax.json:javax.json-api:1.0' otterrete javax.json.JsonException: Provider org.glassfish.json.JsonProviderImpl not found al passo :

.map(reader -> Json.createReader(reader).readObject()) 

della stessa, per favore, utilizzare 'org.glassfish:javax.json:1.0.4'

UPDATE: Inoltre, invece di

.flatMap(jsonObjects -> Observable.from(jsonObjects)) 

È possibile utilizzare flatMapIterable( ):

.flatMapIterable(jsonObjects -> jsonObjects) 
0

Dovreste essere in grado di utilizzare un altro flatMap() chiamata invece della map() chiamata che si sta utilizzando. Quindi utilizzare Observable.create() per emettere JsonObject s

Observable<JsonObject> shareable = 
     observable 
       .flatMap(response -> response.getContent() 
         .flatMap(bb -> { 
          String stringVal = new String(bb); 
          StringReader reader = new StringReader(stringVal); 
          JsonObject jobj = Json.createReader(reader).readObject(); 
          JsonArray elements = jobj.getJsonArray("elements"); 
          return Observable.create(subscriber -> { 
           for (int i = 0; i < elements.length(); i++) { 
            subscriber.onNext(elements.getJSONObject(i)); 
           } 
           subscriber.onCompleted(); 
          }); 
         })) 
       .share(); 
+2

È possibile ottenere nei guai con questo perché flatMap richiede ai suoi ingressi per sostenere contropressione e la tua stat Observable.create ement non supporta contropressione. Io suggerirei per quel po 'di utilizzare Observable.create (nuova AbstractOnSubscribe() {...}) che fa offrire sostegno contropressione. –

+0

Per l'Observable.create si potrebbe anche solo costruire una lista e tornare Observable.from (elenco). –

+0

Quindi, utilizzando la soluzione di cui sopra, ho finito per dover smaterializzare l'osservabile in uscita (restituisce Observerable > così da chiudere.Ma usando dematerialize termina il reso Osservabile (non riesco a capire come analizzare il oggetto di nuovo correttamente) – user2266870