考慮執行其中JdbcRepositoryHandler
(實施MessageHandler
)可能偵聽外部事件(例如,CancelRunEvent
)的功能的實現。如何中斷或取消Spring集成Java DSL流?
我想我會使用Spring'ApplicationEvent
支持通過REST控制器端點發布事件。我想我會讓上述處理程序執行ApplicationListener
來收聽特定事件?
的問題是:如果處理程序與消息飽和它需要處理,我將如何信號可能已經上游例如發出的所有後續消息終止時,從一個FileSplitter
?
雖然我可以容易地構建一個條件調用負責的方法例如,用於持久性操作前應檢查(基於一些狀態下,從接收到的CancelRunEvent
),如何我完全中斷流?
爲了便於說明,想像這樣一個流程:
@Bean
protected IntegrationFlow s3ChannelFlow() {
// @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
// @formatter:off
return IntegrationFlows
.from(s3Channel())
.enrichHeaders(h -> h.headerFunction(RunStats.FILE_TOKEN, f -> UUID.randomUUID().toString()))
.channel(runStatsChannel())
.transform(new FileToInputStreamTransformer())
.split(new FileSplitter())
.transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
@Bean
protected IntegrationFlow persistenceSubFlow() {
// @formatter:off
return f -> f
// @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
.aggregate(a -> a
.correlationStrategy(new HeaderAttributeCorrelationStrategy(RunStats.FILE_TOKEN))
.releaseStrategy(new MessageCountReleaseStrategy(persistenceBatchSize))
.sendPartialResultOnExpiry(true)
.expireGroupsUponCompletion(true)
.groupTimeoutExpression(persistenceBatchReleaseTimeoutMillis)
)
.handle(new JdbcRepositoryHandler(typeSupport, metricRegistry, runStatsRepository));
// @formatter:on
}
處理程序本身不需要實現ApplicationListener,你是對的。只是想知道我可以在哪裏註冊該流程的聽衆?流程的起源是一個HTTP端點。請求參數提供另一個MessageProducingHandler,該MessageProducingHandler從S3存儲段中讀取並同步到本地文件系統,並且其輸出通道是上面流中的S3通道。 –
不確定你的意思是「用流程註冊監聽者」;它不是流程的一部分,而是控制它 - 請參閱[EIP控制總線](http://www.enterpriseintegrationpatterns.com/patterns/messaging/ControlBus.html)和[Spring Integration的實現](http:// docs .spring.io /彈簧整合/參考/ HTML /系統管理-chapter.html#控制總線)。你的聽衆可以直接停止你的S3消息製作者,或者發送一條消息到控制總線來這樣做。 –
謝謝加里揭示這一點和提示。 「通知流程終止處理」可能是描述我的意圖的更好方式。我在春天每天都會發現一些令人高興的東西。 –