2015-09-24 17 views
5

Credo che questa domanda non sia un duplicato di , ma probabilmente correlata a Jersey Server-Sent Events - write to broken connection does not throw exception.Broadcasting con Jersey SSE: Rileva connessione chiusa

In chapter 15.4.2 della documentazione Jersey, il SseBroadcaster è descritto:

Tuttavia, lo SseBroadcaster identifica internamente che gestisce anche client si disconnette. Quando un client chiude la connessione, l'emittente lo rileva e rimuove la connessione obsoleta dalla raccolta interna degli EventOutput registrati, oltre a liberare tutte le risorse lato server associate alla connessione obsoleta.

Non posso confermarlo. Nel seguente banco di prova, vedo il sottotitolo SseBroadcaster del metodo onClose() mai chiamato: non quando lo EventInput è chiuso e non quando viene trasmesso un altro messaggio.

public class NotificationsResourceTest extends JerseyTest { 
    final static Logger log = LoggerFactory.getLogger(NotificationsResourceTest.class); 

    final static CountingSseBroadcaster broadcaster = new CountingSseBroadcaster(); 

    public static class CountingSseBroadcaster extends SseBroadcaster { 
     final AtomicInteger connectionCounter = new AtomicInteger(0); 

     public EventOutput createAndAttachEventOutput() { 
      EventOutput output = new EventOutput(); 
      if (add(output)) { 
       int cons = connectionCounter.incrementAndGet(); 
       log.debug("Active connection count: "+ cons); 
      } 
      return output; 
     } 

     @Override 
     public void onClose(final ChunkedOutput<OutboundEvent> output) { 
      int cons = connectionCounter.decrementAndGet(); 
      log.debug("A connection has been closed. Active connection count: "+ cons); 
     } 

     @Override 
     public void onException(final ChunkedOutput<OutboundEvent> chunkedOutput, final Exception exception) { 
      log.trace("An exception has been detected", exception); 
     } 

     public int getConnectionCount() { 
      return connectionCounter.get(); 
     } 
    } 

    @Path("notifications") 
    public static class NotificationsResource { 

     @GET 
     @Produces(SseFeature.SERVER_SENT_EVENTS) 
     public EventOutput subscribe() { 
      log.debug("New stream subscription"); 

      EventOutput eventOutput = broadcaster.createAndAttachEventOutput(); 
      return eventOutput; 
     } 
    } 

    @Override 
    protected Application configure() { 
     ResourceConfig config = new ResourceConfig(NotificationsResource.class); 
     config.register(SseFeature.class); 

     return config; 
    } 


    @Test 
    public void test() throws Exception { 
     // check that there are no connections 
     assertEquals(0, broadcaster.getConnectionCount()); 

     // connect subscriber 
     log.info("Connecting subscriber"); 
     EventInput eventInput = target("notifications").request().get(EventInput.class); 
     assertFalse(eventInput.isClosed()); 

     // now there are connections 
     assertEquals(1, broadcaster.getConnectionCount()); 

     // push data 
     log.info("Broadcasting data"); 
     String payload = UUID.randomUUID().toString(); 
     OutboundEvent chunk = new OutboundEvent.Builder() 
       .mediaType(MediaType.TEXT_PLAIN_TYPE) 
       .name("message") 
       .data(payload) 
       .build(); 
     broadcaster.broadcast(chunk); 

     // read data 
     log.info("Reading data"); 
     InboundEvent inboundEvent = eventInput.read(); 
     assertNotNull(inboundEvent); 
     assertEquals(payload, inboundEvent.readData()); 

     // close subscription 
     log.info("Closing subscription"); 
     eventInput.close(); 
     assertTrue(eventInput.isClosed()); 

     // at this point, the subscriber has disconnected itself, 
     // but jersey doesnt realise that 
     assertEquals(1, broadcaster.getConnectionCount()); 

     // wait, give TCP a chance to close the connection 
     log.debug("Sleeping for some time"); 
     Thread.sleep(10000); 

     // push data again, this should really flush out the not-connected client 
     log.info("Broadcasting data again"); 
     broadcaster.broadcast(chunk); 
     Thread.sleep(100); 

     // there is no subscriber anymore 
     assertEquals(0, broadcaster.getConnectionCount()); // FAILS! 
    } 
} 

Forse JerseyTest non è un buon modo per testare questo. In una configurazione meno ... clinica, in cui viene utilizzato un JavaScript EventSource, viene chiamato onClose(), ma solo dopo che un messaggio è stato trasmesso sulla connessione precedentemente chiusa.

Cosa sto sbagliando?

Perché il codice SseBroadcaster non rileva la chiusura della connessione da parte del client?

Follow-up

ho trovato JERSEY-2833 che è stata respinta con Opere come progettato:

Secondo la documentazione Jersey nel capitolo SSE (https://jersey.java.net/documentation/latest/sse.html) in 15.4.1 è ha detto che Jersey non chiude esplicitamente la connessione, è responsabilità del metodo della risorsa o del client.

Che cosa significa esattamente? La risorsa dovrebbe imporre un timeout e uccidere tutte le connessioni attive e chiuse per cliente?

risposta

0

credo che potrebbe essere migliore per impostare un timeout sul vostra risorsa e uccidere solo quella connessione, ad esempio:

@Path("notifications") 
public static class NotificationsResource { 

    @GET 
    @Produces(SseFeature.SERVER_SENT_EVENTS) 
    public EventOutput subscribe() { 
     log.debug("New stream subscription"); 

     EventOutput eventOutput = broadcaster.createAndAttachEventOutput(); 
     new Timer().schedule(new TimerTask() 
     { 
      @Override public void run() 
      { 
       eventOutput.close() 
      } 
     }, 10000); // 10 second timeout 
     return eventOutput; 
    } 
} 
+0

Grazie per l'input. Sono sicuro che funzionerebbe, e ha i suoi benefici, ma non è quello che sto cercando. La mia soluzione attuale è di inviare un commento SSE ogni x secondi, che alla fine eliminerà una connessione interrotta. La domanda originale rimane però - perché Jersey non rileva la connessione chiusa e il report è di per sé di conseguenza? – Hank

0

Im chiedendo se per sottoclasse si può avere modificato il comportamento.

@Override 
    public void onClose(final ChunkedOutput<OutboundEvent> output) { 
     int cons = connectionCounter.decrementAndGet(); 
     log.debug("A connection has been closed. Active connection count: "+ cons); 
    } 

In questo non si chiude ChunkedOutput in modo che non rilasci la connessione. Questo potrebbe essere il problema?

+0

No, questo non ha niente a che fare con questo. In realtà, è il contrario, viene chiamato 'onClose()', quando 'OutboundEvent' viene chiuso dal Broadcaster. – Hank

0

Nella documentazione del costruttore org.glassfish.jersey.media.sse.SseBroadcaster.SseBroadcaster(), si dice:

Crea una nuova istanza. Se questo costruttore viene chiamato da una sottoclasse, presuppone che il motivo per cui esiste la sottoclasse sia l'implementazione dei metodi onClose(org.glassfish.jersey.server.ChunkedOutput) e onException(org.glassfish.jersey.server.ChunkedOutput, Exception), quindi aggiunge l'istanza appena creata come listener.Per evitare ciò, le sottoclassi possono chiamare SseBroadcaster(Class) passando la loro classe come argomento.

Quindi non si dovrebbe lasciare costruttore di default e provare l'attuazione vostro costruttore invocando eccellente con la classe:

public CountingSseBroadcaster(){ 
    super(CountingSseBroadcaster.class); 
} 
Problemi correlati