我有一個Spring集成DSL流,它從一個rest API中提取數據,將其轉換併發送到不同的rest API。彈簧集成隊列錯誤處理
獲取數據後,它會將消息發送到隊列通道,進行其餘處理。當隊列正在工作時,原始線程將進入並獲取更多數據。
我遇到的問題是,從隊列中拋出的任何錯誤在處理完所有數據之後纔會處理,但是我希望它立即停止處理並立即拋出錯誤,因爲整個過程可能會花費很長一段時間,但我希望它停止發現第一個錯誤。
網關:
@MessagingGateway(errorChannel = "syncErrorChannel")
@Service
public interface CrmGateway {
@Gateway(requestChannel = "departmentSyncInput", replyChannel = "departmentSyncOutput")
@Payload("new String()")
Object syncDepartments();
}
流量:
/**
* Fetches data from the source api and passes it on to the split channel to process it If the
* response indicates it has more data to fetch then it is also loaded
*
* @return {@link IntegrationFlow}
*/
@Bean
IntegrationFlow sync() {
return IntegrationFlows
.from("departmentSyncInput")
.handle(this::fetchDomain)
.enrichHeaders(s -> s.headerExpressions(h -> h.put("nextLink", "payload.getNext()")))
.routeToRecipients(r -> r
.recipient("departmentSplitChannel")
.recipient(
"departmentSyncInput",
p -> p.getPayload() instanceof Wrapper
&& ((Wrapper) p.getPayload()).getNext() != null
))
.get();
}
/**
* Split data from the api into individual models and send them to the target service
*
* @return {@link IntegrationFlow}
*/
@Bean
IntegrationFlow split() {
return IntegrationFlows
.from("departmentSplitChannel")
.transform(Wrapper.class, Wrapper::getContent)
.split()
.channel(c -> c.executor(Executors.newScheduledThreadPool(100)))
.enrichHeaders(h -> h.header("errorChannel", "syncErrorChannel"))
.handle((payload, headers) -> log("Syncing", payload, payload))
.transform(Department.class, transformer)
// exception happens here
.handle(DepartmentDTO.class, (payload, headers) -> service.upsertDepartment(payload))
.handle((payload, headers) -> log("Synced", payload, payload))
.aggregate()
.get();
}
錯誤處理程序:
@Bean
IntegrationFlow errorHandler() {
return IntegrationFlows
.from("syncErrorChannel")
.handle(Exception.class, (payload, headers) -> {
payload.printStackTrace();
return payload;
})
.get();
}
我也使用IntegrationFlows.from("errorChannel")
具有相同的結果嘗試。
我也嘗試過使用Future
,它的行爲相同,因此當我打電話給get()
時,出現錯誤,但這仍然在最後發生。
感謝您的任何幫助。
謝謝,這似乎已經做到了。使用'c - > c.executor()'與隊列不同?這是我從XML轉換而來的DSL,但我以前沒有使用SI DSL。 –
好吧,從高處來看,它確實是一個隊列,因爲如果沒有空閒線程來處理它,它會爲執行程序提供一個任務,將它們存儲在內部隊列中。我在談論的是QueueChannel針對投票特定的行爲 –