2016-12-04 72 views
2

我有一個Spring集成DSL流,它從一個rest API中提取數據,將其轉換併發送到不同的rest API。彈簧集成隊列錯誤處理

獲取數據後,它會將消息發送到隊列通道,進行其餘處理。當隊列正在工作時,原始線程將進入並獲取更多數據。

我遇到的問題是,從隊列中拋出的任何錯誤在處理完所有數據之後纔會處理,但是我希望它立即停止處理並立即拋出錯誤,因爲整個過程可能會花費很長一段時間,但我希望它停止發現第一個錯誤。

網關:

@MessagingGateway(errorChannel = "syncErrorChannel") 
@Service 
public interface CrmGateway { 
    @Gateway(requestChannel = "departmentSyncInput", replyChannel = "departmentSyncOutput") 
    @Payload("new String()") 
    Object syncDepartments(); 
} 

流量:

/** 
    * Fetches data from the source api and passes it on to the split channel to process it If the 
    * response indicates it has more data to fetch then it is also loaded 
    * 
    * @return {@link IntegrationFlow} 
    */ 
    @Bean 
    IntegrationFlow sync() { 
    return IntegrationFlows 
     .from("departmentSyncInput") 
     .handle(this::fetchDomain) 
     .enrichHeaders(s -> s.headerExpressions(h -> h.put("nextLink", "payload.getNext()"))) 
     .routeToRecipients(r -> r 
     .recipient("departmentSplitChannel") 
     .recipient(
      "departmentSyncInput", 
      p -> p.getPayload() instanceof Wrapper 
      && ((Wrapper) p.getPayload()).getNext() != null 
     )) 
     .get(); 
    } 

    /** 
    * Split data from the api into individual models and send them to the target service 
    * 
    * @return {@link IntegrationFlow} 
    */ 
    @Bean 
    IntegrationFlow split() { 
    return IntegrationFlows 
     .from("departmentSplitChannel") 
     .transform(Wrapper.class, Wrapper::getContent) 
     .split() 
     .channel(c -> c.executor(Executors.newScheduledThreadPool(100))) 
     .enrichHeaders(h -> h.header("errorChannel", "syncErrorChannel")) 
     .handle((payload, headers) -> log("Syncing", payload, payload)) 
     .transform(Department.class, transformer) 
     // exception happens here 
     .handle(DepartmentDTO.class, (payload, headers) -> service.upsertDepartment(payload)) 
     .handle((payload, headers) -> log("Synced", payload, payload)) 
     .aggregate() 
     .get(); 
    } 

錯誤處理程序:

@Bean 
    IntegrationFlow errorHandler() { 
    return IntegrationFlows 
     .from("syncErrorChannel") 
     .handle(Exception.class, (payload, headers) -> { 
     payload.printStackTrace(); 
     return payload; 
     }) 
     .get(); 
    } 

我也使用IntegrationFlows.from("errorChannel")具有相同的結果嘗試。

我也嘗試過使用Future,它的行爲相同,因此當我打電話給get()時,出現錯誤,但這仍然在最後發生。

感謝您的任何幫助。

回答

0

您的流程中沒有queue頻道定義,但我想你的意思是.channel(c -> c.executor())。如果你在這個問題上分享日誌,情況會更好。

我可以說你試圖覆蓋errorChannel標題,在Gateway的情況下是TemporaryReplyChannel

因此,錯誤發送到網關的進程並在split的情況下崩潰。

我建議你用h.header("errorChannel", "syncErrorChannel", true)來嘗試覆蓋該標題。

+0

謝謝,這似乎已經做到了。使用'c - > c.executor()'與隊列不同?這是我從XML轉換而來的DSL,但我以前沒有使用SI DSL。 –

+0

好吧,從高處來看,它確實是一個隊列,因爲如果沒有空閒線程來處理它,它會爲執行程序提供一個任務,將它們存儲在內部隊列中。我在談論的是QueueChannel針對投票特定的行爲 –