2015-12-09 197 views
8

我試圖用Spring 4(tomcat 7,servlet-api 3.0.1)發送服務器發送的事件。Spring sseEmitter,發送方法發送後不會立即發送事件

問題是我的Events在方法發送被調用後沒有被髮送。它們都是在SseEmitter超時後與EventSource的錯誤事件同時發生(具有相同的時間戳)。然後客戶端正在嘗試重新連接。任何想法發生了什麼?

我創建了一個簡單的服務:

@RequestMapping(value = "subscribe", method = RequestMethod.GET) 
public SseEmitter subscribe() throws IOException { 
    final SseEmitter emitter = new SseEmitter(); 
    Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() { 
     @Override 
     public void run() { 
      try { 
       emitter.send(SseEmitter.event().data("Thread writing: " + Thread.currentThread()).name("ping")); 
      } catch (Exception e) { 
      } 
     } 
    } , 1000, 1000, TimeUnit.MILLISECONDS); 
    return emitter; 
} 

與客戶端代碼:

sse = new EventSource(urlBuilder(base, url)); 
sse.addEventListener('ping', function (event) { 
    dfd.notify(event); 
}); 

sse.addEventListener('message', function(event){ 
    dfd.notify(event); 
}); 

sse.addEventListener('close', function(event){ 
    dfd.notify(event); 
}); 

sse.onerror = function (error) { 
    console.log(error); 
}; 

sse.onmessage = function (event){ 
    dfd.notify(event); 
}; 

應用initalizer代碼

public class WebAppInitializer implements WebApplicationInitializer { 
    @Override 
    public void onStartup(ServletContext servletContext) throws ServletException { 
     AnnotationConfigWebApplicationContext ctx = new AnnotationConfigWebApplicationContext(); 
     ctx.register(AppConfig.class); 
     ctx.setServletContext(servletContext); 
     ctx.refresh(); 

     ServletRegistration.Dynamic dynamic = servletContext.addServlet("dispatcher", new DispatcherServlet(ctx)); 
     dynamic.setAsyncSupported(true); 
     dynamic.addMapping("/api/*"); 
     dynamic.setLoadOnStartup(1); 
     dynamic.setMultipartConfig(ctx.getBean(MultipartConfigElement.class)); 

     javax.servlet.FilterRegistration.Dynamic filter = servletContext 
       .addFilter("StatelessAuthenticationFilter", 
         ctx.getBean("statelessAuthenticationFilter", StatelessAuthenticationFilter.class)); 
     filter.setAsyncSupported(true); 
     filter.addMappingForUrlPatterns(null, false, "/api/*"); 

     filter = servletContext.addFilter("HibernateSessionRequestFilter", 
       ctx.getBean("hibernateSessionRequestFilter", HibernateSessionRequestFilter.class)); 
     filter.setAsyncSupported(true); 
     filter.addMappingForUrlPatterns(null, false, "/api/user/*"); 
    } 
} 

AppConfig.java

@Configuration 
@ComponentScan("ru.esoft.workflow") 
@EnableWebMvc 
@PropertySource({"classpath:mail.properties", "classpath:fatclient.properties"}) 
@EnableAsync 
@EnableScheduling 
public class AppConfig extends WebMvcConfigurerAdapter { 
... 
} 

我的客戶端日誌圖片: enter image description here

+0

我也有類似的問題。然而,在閱讀https://jira.spring.io/browse/SPR-14578 後,我用'Thread'和'Thread.start()'試了一下,似乎問題消失了,但我不知道真的爲什麼。無論如何,我認爲它與RxJava一起使用是很奇怪的,儘管它肯定是更好的方法。 – user140547

+0

我爲此創建了一個JIRA改進(https://jira.spring.io/browse/SPR-15299),因爲我有同樣的問題。讓我們看看它是怎麼回事...... – cristi

+0

在我的情況下,事實證明,坐在瀏覽器和Tomcat之間的IIS是問題(按照我之前對所報告的錯誤發表評論,你會發現完整的解釋)。 – cristi

回答

2

我在測試SSEEmitters時遇到了這個問題。從我在線閱讀的所有內容中,SSEEmitters是與Reactive Streams的一些實現結合使用的,例如RxJava。這有點複雜,但它確實有效。這個想法是,你創建發射器和Observable,並將後者訂閱到發佈者。發佈者在一個單獨的線程中執行其行爲,當輸出就緒時通知Observable,並且observable觸發emitter.send。下面是一個例子片斷應該做你想要什麼:

@RequestMapping("/whatever") 
public SseEmitter index( 
    SseEmitter emitter = new SseEmitter(); 
    Publisher<String> responsePublisher = someResponseGenerator.getPublisher(); 
    Observable<String> responseObservable = RxReactiveStreams.toObservable(responsePublisher); 

    responseObservable.subscribe(
     str -> { 
      try { 
       emitter.send(str); 
      } catch (IOException ex) { 
       emitter.completeWithError(ex); 
      } 
     }, 
     error -> { 
      emitter.completeWithError(error); 
     }, 
     emitter::complete 
     ); 

     return emitter; 
}; 

下面是一個對應的發行人:

public class SomeResponseGenerator {  
    public Publisher<String> getPublisher() { 
     Publisher<String> pub = new Publisher<String>() { 
      @Override 
      public void subscribe(Subscriber subscriber) { 
       Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() { 
        @Override 
        public void run() { 
         subscriber.onNext("Thread writing: " + Thread.currentThread().getName()); 
        } 
       }, 1000, 1000, TimeUnit.MILLISECONDS); 
      } 
     }; 

     return pub; 
    } 
} 

有此模型的幾個例子在網上herehere,你可以找到Google更多的'RxJava SseEmitter'。獲取Reactive Streams/RxJava/SseEmitter交互需要一些時間,但一旦完成,它就非常優雅。希望這能讓你走上正確的道路!

0

而對方的回答是正確的,如果你想自己管理它,您可以撥打:

emitter.complete()