2016-04-13 24 views
8

Posso utilizzare Retrofit + RxJava per ascoltare un flusso infinito? Ad esempio il flusso di Twitter. Quello che ho è questa:Android Retrofit 2 + RxJava: ascolta il flusso infinito

public interface MeetupAPI { 
    @GET("http://stream.meetup.com/2/rsvps/") 
    Observable<RSVP> getRSVPs(); 
} 

MeetupAPI api = new Retrofit.Builder() 
      .baseUrl(MeetupAPI.RSVP_API) 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .addConverterFactory(GsonConverterFactory.create()) 
      .build() 
      .create(MeetupAPI.class); 

api.getRSVPs() 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(rsvp -> Log.d(TAG, "got rsvp"), 
       error -> Log.d(TAG, "error: " + error), 
       () -> Log.d(TAG, "onComplete")); 

ma il "onComplete" viene richiamato dopo il primo oggetto è stato analizzato. C'è un modo per dire a Retrofit di rimanere aperto fino a nuovo avviso?

risposta

9

Ecco la mia soluzione:

È possibile utilizzare l'annotazione @Streaming:

public interface ITwitterAPI { 

    @GET("/2/rsvps") 
    @Streaming 
    Observable<ResponseBody> twitterStream(); 
} 

ITwitterAPI api = new Retrofit.Builder() 
      .baseUrl("http://stream.meetup.com") 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .build().create(ITwitterAPI.class); 

Con @Streaming possiamo ottenere ingresso RAW da ResponseBody.

Ecco la mia funzione di avvolgere il corpo diviso per linee con gli eventi:

public static Observable<String> events(BufferedSource source) { 
    return Observable.create(new Observable.OnSubscribe<String>() { 
     @Override 
     public void call(Subscriber<? super String> subscriber) { 
      try { 
       while (!source.exhausted()) { 
        subscriber.onNext(source.readUtf8Line()); 
       } 
       subscriber.onCompleted(); 
      } catch (IOException e) { 
       e.printStackTrace(); 
       subscriber.onError(e); 
      } 
     } 
    }); 
} 

e il risultato utilizzo:

api.twitterStream() 
    .flatMap(responseBody -> events(responseBody.source())) 
    .subscribe(System.out::println); 

upd di smettere con grazia

Quando abbiamo cancellarsi, chiude retrofit inputstream. Ma impossibile rilevare inputstream chiuso o non da inputstream stesso, quindi solo modo - prova a leggere dallo stream - otteniamo un'eccezione con il messaggio Socket closed. Possiamo interpretare questa eccezione come chiusura:

 @Override 
     public void call(Subscriber<? super String> subscriber) { 
      boolean isCompleted = false; 
      try { 
       while (!source.exhausted()) { 
        subscriber.onNext(source.readUtf8Line()); 
       } 
      } catch (IOException e) { 
       if (e.getMessage().equals("Socket closed")) { 
        isCompleted = true; 
        subscriber.onCompleted(); 
       } else { 
        throw new UncheckedIOException(e); 
       } 
      } 
      //if response end we get here 
      if (!isCompleted) { 
       subscriber.onCompleted(); 
      } 
     } 

E se la connessione chiusa a causa fine la risposta, non abbiamo alcuna eccezione. Qui controlla isCompleted. Fammi sapere se ho torto :)

+0

Grazie! Qualche suggerimento su come smettere con garbo di ascoltare lo streaming? – ticofab

Problemi correlati