2015-12-09 12 views
8

Sto tentando di creare eventi inviati dal server con Spring 4 (tomcat 7, servlet-apie 3.0.1).Spring sseEmitter, gli eventi non vengono inviati subito dopo l'invio del metodo è stato chiamato

Il problema è che il mio Events non viene inviato subito dopo l'invio del metodo. Vengono tutti contemporaneamente (con lo stesso timestamp) al client solo dopo il timeout di SseEmitter, con l'evento di errore EventSource. E poi il cliente sta cercando di riconnettersi. Qualche idea su cosa sta succedendo?

ho creato un semplice servizio:

@RequestMapping(value = "subscribe", method = RequestMethod.GET) 
public SseEmitter subscribe() throws IOException { 
    final SseEmitter emitter = new SseEmitter(); 
    Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() { 
     @Override 
     public void run() { 
      try { 
       emitter.send(SseEmitter.event().data("Thread writing: " + Thread.currentThread()).name("ping")); 
      } catch (Exception e) { 
      } 
     } 
    } , 1000, 1000, TimeUnit.MILLISECONDS); 
    return emitter; 
} 

con codice client:

sse = new EventSource(urlBuilder(base, url)); 
sse.addEventListener('ping', function (event) { 
    dfd.notify(event); 
}); 

sse.addEventListener('message', function(event){ 
    dfd.notify(event); 
}); 

sse.addEventListener('close', function(event){ 
    dfd.notify(event); 
}); 

sse.onerror = function (error) { 
    console.log(error); 
}; 

sse.onmessage = function (event){ 
    dfd.notify(event); 
}; 

App codice initalizer

public class WebAppInitializer implements WebApplicationInitializer { 
    @Override 
    public void onStartup(ServletContext servletContext) throws ServletException { 
     AnnotationConfigWebApplicationContext ctx = new AnnotationConfigWebApplicationContext(); 
     ctx.register(AppConfig.class); 
     ctx.setServletContext(servletContext); 
     ctx.refresh(); 

     ServletRegistration.Dynamic dynamic = servletContext.addServlet("dispatcher", new DispatcherServlet(ctx)); 
     dynamic.setAsyncSupported(true); 
     dynamic.addMapping("/api/*"); 
     dynamic.setLoadOnStartup(1); 
     dynamic.setMultipartConfig(ctx.getBean(MultipartConfigElement.class)); 

     javax.servlet.FilterRegistration.Dynamic filter = servletContext 
       .addFilter("StatelessAuthenticationFilter", 
         ctx.getBean("statelessAuthenticationFilter", StatelessAuthenticationFilter.class)); 
     filter.setAsyncSupported(true); 
     filter.addMappingForUrlPatterns(null, false, "/api/*"); 

     filter = servletContext.addFilter("HibernateSessionRequestFilter", 
       ctx.getBean("hibernateSessionRequestFilter", HibernateSessionRequestFilter.class)); 
     filter.setAsyncSupported(true); 
     filter.addMappingForUrlPatterns(null, false, "/api/user/*"); 
    } 
} 

AppConfig.java

@Configuration 
@ComponentScan("ru.esoft.workflow") 
@EnableWebMvc 
@PropertySource({"classpath:mail.properties", "classpath:fatclient.properties"}) 
@EnableAsync 
@EnableScheduling 
public class AppConfig extends WebMvcConfigurerAdapter { 
... 
} 

Immagine della mia registro del client: enter image description here

+0

Ho avuto un problema simile. Tuttavia, dopo aver letto https://jira.spring.io/browse/SPR-14578 l'ho provato con un 'Thread' e' Thread.start() ', e sembra che il problema sia scomparso, ma non lo so davvero perchè. Ad ogni modo, penso che sia strano se funzionasse in combinazione con RxJava, anche se è certamente un approccio migliore. – user140547

+0

Per questo ho creato un miglioramento JIRA (https://jira.spring.io/browse/SPR-15299) poiché ho lo stesso problema. Vediamo come va ... – cristi

+0

Nel mio caso si è scoperto che IIS stava tra il browser e Tomcat era il problema (segui il mio precedente commento sul bug segnalato e troverai la spiegazione completa). – cristi

risposta

2

mi sono imbattuto in questo io stesso durante il test SSEEmitters. Da tutto ciò che ho letto online, gli SSEEmitter sono pensati per essere utilizzati in concomitanza con alcune implementazioni di Reactive Streams, come RxJava. È un po 'complesso, ma sicuramente funziona. L'idea è di creare l'emettitore e un osservabile e di abbonare quest'ultimo a un editore. Il server di pubblicazione esegue il suo comportamento in un thread separato, notificando l'Observable quando l'output è pronto e l'osservabile attiva emitter.send. Ecco un frammento di esempio che dovrebbe fare quello che vuoi:

@RequestMapping("/whatever") 
public SseEmitter index( 
    SseEmitter emitter = new SseEmitter(); 
    Publisher<String> responsePublisher = someResponseGenerator.getPublisher(); 
    Observable<String> responseObservable = RxReactiveStreams.toObservable(responsePublisher); 

    responseObservable.subscribe(
     str -> { 
      try { 
       emitter.send(str); 
      } catch (IOException ex) { 
       emitter.completeWithError(ex); 
      } 
     }, 
     error -> { 
      emitter.completeWithError(error); 
     }, 
     emitter::complete 
     ); 

     return emitter; 
}; 

Qui è un server di pubblicazione corrispondente:

public class SomeResponseGenerator {  
    public Publisher<String> getPublisher() { 
     Publisher<String> pub = new Publisher<String>() { 
      @Override 
      public void subscribe(Subscriber subscriber) { 
       Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() { 
        @Override 
        public void run() { 
         subscriber.onNext("Thread writing: " + Thread.currentThread().getName()); 
        } 
       }, 1000, 1000, TimeUnit.MILLISECONDS); 
      } 
     }; 

     return pub; 
    } 
} 

Ci sono alcuni esempi di questo modello in linea here e here, e si possono trovare altro da Googling 'RxJava SseEmitter'. Ci vuole del tempo per ingannare le interazioni Reactive Stream/RxJava/SseEmitter, ma una volta che lo fai è piuttosto elegante. Spero che questo ti metta sulla giusta strada!

0

Mentre l'altra risposta è corretta, se si desidera gestire da soli si può chiamare:

emitter.complete() 
Problemi correlati