2015-05-03 14 views
8

Ho un'applicazione client server (Java EE & Android), comunicazione tramite websockets. La comunicazione funziona e anche il protocollo stesso per l'invio di oggetti come json, che sarà correttamente incartato, serializzato, inviato, deserializzato, scartato e ricostruito. Entrambe le applicazioni utilizzano un altro progetto di libreria con tutte le possibili classi di richieste e risposte.sincronizzazione richiesta/risposta nel contesto nio

Ora per il mio problema: La libreria deve anche implementare una strategia per la comunicazione non bloccante, ma trasparente implementazione richiesta-risposta. Probabilmente non sono il primo a risolvere questo problema, quindi penso che potrebbero esserci delle buone implementazioni là fuori :).

quello che voglio:

// server should sleep 5000ms and then return 3*3 
Future<Integer> f1 = server.put(
    new SleepAndReturnSquareRequest(5000, 3), 
    new FutureCallback<Integer>{ 
    public void onSuccess(Integer square) { 
     runOnUiThread(new Runnable{ 
     // Android Toast show square 
     }); 
    } 

    // impl onFailure 
    } 
); 

Future<Date> f2 = server.put(
    new TimeRequest(), 
    new FutureCallback<Date>{ 
    public void onSuccess(Date date) { 
     // called before other onSuccess 
    } 

    // impl onFailure 
    } 
); 

// e.g. when the activity in android changes I'll cancel all futures. That means no more callbacks and (later) if possible client sends cancellations to the server for long requests. 

Il codice deve inviare un SleepAndReturnRequest e poi un TimeRequest naturalmente non-blocking. La prima richiesta richiede 5 secondi, la seconda richiesta quasi zero millis. Mi aspetto che l'implementazione richiami la seconda richiamata immediatamente dopo la ricezione della risposta, mentre la prima richiamata viene chiamata dopo circa 5 secondi. L'implementazione è responsabile della corrispondenza richiesta-risposta dal lato richiedente.

quello che ho provato e pensato:

guava di Google listenable future sarebbe un buon approccio per il 'lato risponde' io penso, perché è solo un compito in esecuzione su qualsiasi thread, inviando il risultato di nuovo alla fine. Dovrebbe essere più facile

Per il "lato richiedente" ho bisogno di qualche implementazione che aggiunga un identificatore univoco a un messaggio per essere in grado di abbinare la risposta. Spero che tu possa dirmi che alcuni pacchetti fanno quel lavoro.

Grazie per l'aiuto.

// edit:

Credo che la mia domanda è stata fraintesa o non è sufficientemente preciso. Pensa a come implementare GET o POST tramite websocket. Ogni richiesta GET/POST ha una risposta e quindi la connessione viene chiusa. I client si connettono a una porta specifica, il server prende una discussione da un pool di thread, elabora la richiesta e risponde. La corrispondenza tra la richiesta e la risposta penso sia fatta nel livello di trasporto # 4.

Dato che voglio usare WebSockets, devo attuare tale corrispondente nel layer software 7.

Ecco alcuni passi che sto attuazione. K è il tipo di chiave univoco, V è il tipo generico del contenuto di un messaggio. Potrebbe essere una stringa, un flusso di byte, qualunque cosa.

public class Synchronizer<K, V> implements UniqueMessageListener<K, V> { 

    private final ConcurrentMap<K, FutureCallback<V>> callbackMap = new ConcurrentHashMap<>(); 

    private final ListeningExecutorService executor; 

    private final UniqueMessageFactory<K, V> factory; 
    private final UniqueMessageSender<K, V> sender; 
    private UniqueMessageReceiver<K, V> receiver = null; 

    public Synchronizer(
      ListeningExecutorService executor, 
      UniqueMessageFactory<K, V> factory, 
      UniqueMessageSender<K, V> sender 
    ) { 
     this.executor = executor; 
     this.factory = factory; 
     this.sender = sender; 
    } 

    public void register(UniqueMessageReceiver<K, V> receiver) { 

     unregister(); 

     this.receiver = receiver; 
     receiver.addListener(this); 
    } 

    public void unregister() { 
     if(receiver != null) { 
      receiver.removeListener(this); 
      receiver = null; 
     } 
    } 

    public Future<V> put(Message<V> message, final FutureCallback<V> callback) { 
     final UniqueMessage<K, V> uniqueMessage = factory.create(message); 

     final Future<Boolean> sendFuture = sender.send(uniqueMessage); 

     final ListenableFuture<Boolean> listenableSendFuture = 
       JdkFutureAdapters.listenInPoolThread(sendFuture, executor); 


     listenableSendFuture.addListener(
       new Runnable() { 
        @Override 
        public void run() { 
         try { 
          if(listenableSendFuture.get() == true) { 
           callbackMap.put(
             uniqueMessage.getId(), 
             callback 
           ); 
          } else { 
           // maybe try it later again? 
          } 
         } catch(Exception e) { 
          // ... 
         } 
        } 
       }, 
       executor 
     ); 

     // implement cancel 
     return new SynchronizeFuture<>(
       listenableSendFuture, 
       callback 
     ); 
    } 

    @Override 
    public void onReceive(UniqueMessage<K, V> message) { 
     K id = message.getId(); 
     FutureCallback<V> callback; 

     callback = callbackMap.remove(id); 

     if(callback != null) { 
      callback.onSuccess(message.getContent()); 
     } 
    } 
} 

C'è molto da testare per me, ma penso che funzionerà.

+0

C'è qualcosa in grizzly o Netty a lavorare per me? – Aitch

+0

Hai bisogno di qualcosa per generare un transactionId nel lato client, questa è la tua domanda? –

+0

@Mr_Thorynque no.Si tratta di come implementare una sincronizzazione di richieste e risposte tramite websocket. L'utilizzo di un ID transazione unico è abbastanza ovvio. Leggi il mio codice qui sopra. – Aitch

risposta

-1

Si potrebbe provare ProtoBuf-RPC-Pro - ma non è per WebSockets anche se penso che ci sia un progetto su GitHub sostenerlo :)