2015-10-27 64 views
1

考慮執行其中JdbcRepositoryHandler(實施MessageHandler)可能偵聽外部事件(例如,CancelRunEvent)的功能的實現。如何中斷或取消Spring集成Java DSL流?

我想我會使用Spring'ApplicationEvent支持通過REST控制器端點發布事件。我想我會讓上述處理程序執行ApplicationListener來收聽特定事件?

的問題是:如果處理程序與消息飽和它需要處理,我將如何信號可能已經上游例如發出的所有後續消息終止時,從一個FileSplitter

雖然我可以容易地構建一個條件調用負責的方法例如,用於持久性操作前應檢查(基於一些狀態下,從接收到的CancelRunEvent),如何我完全中斷流?

爲了便於說明,想像這樣一個流程:

@Bean 
protected IntegrationFlow s3ChannelFlow() { 
    // @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading 
    // @formatter:off 
    return IntegrationFlows 
     .from(s3Channel()) 
     .enrichHeaders(h -> h.headerFunction(RunStats.FILE_TOKEN, f -> UUID.randomUUID().toString())) 
     .channel(runStatsChannel()) 
     .transform(new FileToInputStreamTransformer()) 
     .split(new FileSplitter()) 
     .transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport)) 
     .publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow())) 
     .get(); 
    // @formatter:on 
} 

@Bean 
protected IntegrationFlow persistenceSubFlow() { 
    // @formatter:off 
    return f -> f 
      // @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to 
      .aggregate(a -> a 
        .correlationStrategy(new HeaderAttributeCorrelationStrategy(RunStats.FILE_TOKEN)) 
        .releaseStrategy(new MessageCountReleaseStrategy(persistenceBatchSize)) 
        .sendPartialResultOnExpiry(true) 
        .expireGroupsUponCompletion(true) 
        .groupTimeoutExpression(persistenceBatchReleaseTimeoutMillis) 
      ) 
      .handle(new JdbcRepositoryHandler(typeSupport, metricRegistry, runStatsRepository)); 
    // @formatter:on 
} 

回答

0

這不是完全清楚你的意思是什麼,或者你爲什麼會需要JdbcRepositoryHandler來管理這個,而不是其他一些ApplicationListener

您的流程在s3Channel()上游的某個線程上運行。取決於是什麼,你可以在stop()消息源,並且在當前消息之後不會發出新的消息(或者如果它是多線程的話)。

但是,您可能(可能會)在內存中部分聚集,直到組超時。

+0

處理程序本身不需要實現ApplicationListener,你是對的。只是想知道我可以在哪裏註冊該流程的聽衆?流程的起源是一個HTTP端點。請求參數提供另一個MessageProducingHandler,該MessageProducingHandler從S3存儲段中讀取並同步到本地文件系統,並且其輸出通道是上面流中的S3通道。 –

+0

不確定你的意思是「用流程註冊監聽者」;它不是流程的一部分,而是控制它 - 請參閱[EIP控制總線](http://www.enterpriseintegrationpatterns.com/patterns/messaging/ControlBus.html)和[Spring Integration的實現](http:// docs .spring.io /彈簧整合/參考/ HTML /系統管理-chapter.html#控制總線)。你的聽衆可以直接停止你的S3消息製作者,或者發送一條消息到控制總線來這樣做。 –

+0

謝謝加里揭示這一點和提示。 「通知流程終止處理」可能是描述我的意圖的更好方式。我在春天每天都會發現一些令人高興的東西。 –

相關問題