2017-10-06 74 views
0

我需要做些什麼來防止RabbitMQ推測的以下異常。如何防止ListenerExecutionFailedException:監聽器拋出異常

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:877) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:787) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:707) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:98) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1236) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:688) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1190) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1174) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1200(SimpleMessageListenerContainer.java:98) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1363) 
    at java.lang.Thread.run(Thread.java:748) 
    Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'amqpLaunchSpringBatchJobFlow.channel#0'; nested exception is jp.ixam_drive.batch.service.JobExecutionRuntimeException: Failed to start job with name ads-insights-import and parameters {accessToken=<ACCESS_TOKEN>, id=act_1234567890, classifier=stats, report_run_id=1482330625184792, job_request_id=32} 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171) 
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$400(AmqpInboundChannelAdapter.java:45) 
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$1.onMessage(AmqpInboundChannelAdapter.java:95) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:784) 
    ... 10 common frames omitted 
    Caused by: jp.ixam_drive.batch.service.JobExecutionRuntimeException: Failed to start job with name ads-insights-import and parameters {accessToken=<ACCESS_TOKEN>, id=act_1234567890, classifier=stats, report_run_id=1482330625184792, job_request_id=32} 
    at jp.ixam_drive.facebook.SpringBatchLauncher.launchJob(SpringBatchLauncher.java:42) 
    at jp.ixam_drive.facebook.AmqpBatchLaunchIntegrationFlows.lambda$amqpLaunchSpringBatchJobFlow$1(AmqpBatchLaunchIntegrationFlows.java:71) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) 
    ... 18 common frames omitted 
    Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={accessToken=<ACCESS_TOKEN>, id=act_1234567890, classifier=stats, report_run_id=1482330625184792, job_request_id=32}. If you want to run this job again, change the parameters. 
    at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:126) 
    at sun.reflect.GeneratedMethodAccessor193.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) 
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99) 
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:282) 
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
    at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean$1.invoke(AbstractJobRepositoryFactoryBean.java:172) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) 
    at com.sun.proxy.$Proxy125.createJobExecution(Unknown Source) 
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:125) 
    at jp.ixam_drive.batch.service.JobOperationsService.launch(JobOperationsService.java:64) 
    at jp.ixam_drive.facebook.SpringBatchLauncher.launchJob(SpringBatchLauncher.java:37) 
    ... 24 common frames omitted 

當我有兩個Spring Boot應用程序並行運行以下代碼來執行Spring Batch Jobs?

@Configuration 
@Conditional(AmqpBatchLaunchCondition.class) 
@Slf4j 
public class AmqpAsyncAdsInsightsConfiguration { 

    @Autowired 
    ObjectMapper objectMapper; 

    @Value("${batch.launch.amqp.routing-keys.async-insights}") 
    String routingKey; 

    @Bean 
    public IntegrationFlow amqpOutboundAsyncAdsInsights(AmqpTemplate amqpTemplate) { 
     return IntegrationFlows.from("async_ads_insights") 
       .<JobParameters, byte[]>transform(SerializationUtils::serialize) 
       .handle(Amqp.outboundAdapter(amqpTemplate).routingKey(routingKey)).get(); 
    } 

    @Bean 
    public IntegrationFlow amqpAdsInsightsAsyncJobRequestFlow(FacebookMarketingServiceProvider serviceProvider, 
      JobParametersToApiParametersTransformer transformer, ConnectionFactory connectionFactory) { 
     return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, routingKey)) 
       .<byte[], JobParameters>transform(SerializationUtils::deserialize) 
       .<JobParameters, ApiParameters>transform(transformer) 
       .<ApiParameters>handle((payload, header) -> { 
        String accessToken = (String) header.get("accessToken"); 
        String id = (String) header.get("object_id"); 
        FacebookMarketingApi api = serviceProvider.getApi(accessToken); 
        String reportRunId = api.asyncRequestOperations().getReportRunId(id, payload.toMap()); 
        ObjectNode objectNode = objectMapper.createObjectNode(); 
        objectNode.put("accessToken", accessToken); 
        objectNode.put("id", id); 
        objectNode.put("report_run_id", reportRunId); 
        objectNode.put("classifier", (String) header.get("classifier")); 
        objectNode.put("job_request_id", (Long) header.get("job_request_id")); 
        return serialize(objectNode); 
       }).channel("ad_report_run_polling_channel").get(); 
    } 

    @SneakyThrows 
    private String serialize(JsonNode jsonNode) { 
     return objectMapper.writeValueAsString(jsonNode); 
    } 
} 

@Configuration 
@Conditional(AmqpBatchLaunchCondition.class) 
@Slf4j 
public class AmqpBatchLaunchIntegrationFlows { 

    @Autowired 
    SpringBatchLauncher batchLauncher; 

    @Value("${batch.launch.amqp.routing-keys.job-launch}") 
    String routingKey; 

    @Bean(name = "batch_launch_channel") 
    public MessageChannel batchLaunchChannel() { 
     return MessageChannels.executor(Executors.newSingleThreadExecutor()).get(); 
    } 

    @Bean 
    public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate, 
      @Qualifier("batch_launch_channel") MessageChannel batchLaunchChannel) { 
     return IntegrationFlows.from(batchLaunchChannel) 
       .<JobParameters, byte[]>transform(SerializationUtils::serialize) 
       .handle(Amqp.outboundAdapter(amqpTemplate).routingKey(routingKey)).get(); 
    } 

    @Bean 
    public IntegrationFlow amqpLaunchSpringBatchJobFlow(ConnectionFactory connectionFactory) { 
     return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, routingKey)) 
       .handle(message -> { 
        String jobName = (String) message.getHeaders().get("job_name"); 
        byte[] bytes = (byte[]) message.getPayload(); 
        JobParameters jobParameters = SerializationUtils.deserialize(bytes); 
        batchLauncher.launchJob(jobName, jobParameters); 
       }).get(); 
    } 
} 

@Configuration 
@Slf4j 
public class AsyncAdsInsightsConfiguration { 

    @Value("${batch.core.pool.size}") 
    public Integer batchCorePoolSize; 

    @Value("${ixam_drive.facebook.api.ads-insights.async-poll-interval}") 
    public String asyncPollInterval; 

    @Autowired 
    ObjectMapper objectMapper; 

    @Autowired 
    private DataSource dataSource; 

    @Bean(name = "async_ads_insights") 
    public MessageChannel adsInsightsAsyncJobRequestChannel() { 
     return MessageChannels.direct().get(); 
    } 

    @Bean(name = "ad_report_run_polling_channel") 
    public MessageChannel adReportRunPollingChannel() { 
     return MessageChannels.executor(Executors.newFixedThreadPool(batchCorePoolSize)).get(); 
    } 

    @Bean 
    public IntegrationFlow adReportRunPollingLoopFlow(FacebookMarketingServiceProvider serviceProvider) { 
     return IntegrationFlows.from(adReportRunPollingChannel()) 
       .<String>handle((payload, header) -> { 
        ObjectNode jsonNode = deserialize(payload); 
        String accessToken = jsonNode.get("accessToken").asText(); 
        String reportRunId = jsonNode.get("report_run_id").asText(); 
        try { 
         AdReportRun adReportRun = serviceProvider.getApi(accessToken) 
           .fetchObject(reportRunId, AdReportRun.class); 
         log.debug("ad_report_run: {}", adReportRun); 
         return jsonNode.set("ad_report_run", objectMapper.valueToTree(adReportRun)); 
        } catch (Exception e) { 
         log.error("failed while polling for ad_report_run.id: {}", reportRunId); 
         throw new RuntimeException(e); 
        } 
       }).<JsonNode, Boolean>route(payload -> { 
        JsonNode adReportRun = payload.get("ad_report_run"); 
        return adReportRun.get("async_percent_completion").asInt() == 100 && 
          "Job Completed".equals(adReportRun.get("async_status").asText()); 
       }, rs -> rs.subFlowMapping(true, 
         f -> f.transform(JsonNode.class, 
           source -> { 
            JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); 
            jobParametersBuilder 
              .addString("accessToken", source.get("accessToken").asText()); 
            jobParametersBuilder.addString("id", source.get("id").asText()); 
            jobParametersBuilder 
              .addString("classifier", source.get("classifier").asText()); 
            jobParametersBuilder 
              .addLong("report_run_id", source.get("report_run_id").asLong()); 
            jobParametersBuilder 
              .addLong("job_request_id", source.get("job_request_id").asLong()); 
            return jobParametersBuilder.toJobParameters(); 
           }).channel("batch_launch_channel")) 
         .subFlowMapping(false, 
           f -> f.transform(JsonNode.class, this::serialize) 
             .<String>delay("delay", asyncPollInterval, c -> c.transactional() 
               .messageStore(jdbcMessageStore())) 
             .channel(adReportRunPollingChannel()))).get(); 
    } 

    @SneakyThrows 
    private String serialize(JsonNode jsonNode) { 
     return objectMapper.writeValueAsString(jsonNode); 
    } 

    @SneakyThrows 
    private ObjectNode deserialize(String payload) { 
     return objectMapper.readerFor(ObjectNode.class).readValue(payload); 
    } 

    @Bean 
    public JdbcMessageStore jdbcMessageStore() { 
     JdbcMessageStore jdbcMessageStore = new JdbcMessageStore(dataSource); 
     return jdbcMessageStore; 
    } 

    @Bean 
    public JobParametersToApiParametersTransformer jobParametersToApiParametersTransformer() { 
     return new JobParametersToApiParametersTransformer() { 
      @Override 
      protected ApiParameters transform(JobParameters jobParameters) { 
       ApiParameters.ApiParametersBuilder builder = ApiParameters.builder(); 
       MultiValueMap<String, String> multiValueMap = new LinkedMultiValueMap<>(); 
       String level = jobParameters.getString("level"); 
       if (!StringUtils.isEmpty(level)) { 
        multiValueMap.set("level", level); 
       } 
       String fields = jobParameters.getString("fields"); 
       if (!StringUtils.isEmpty(fields)) { 
        multiValueMap.set("fields", fields); 
       } 
       String filter = jobParameters.getString("filter"); 
       if (filter != null) { 
        try { 
         JsonNode jsonNode = objectMapper.readTree(filter); 
         if (jsonNode != null && jsonNode.isArray()) { 
          List<ApiFilteringParameters> filteringParametersList = new ArrayList<>(); 
          List<ApiSingleValueFilteringParameters> singleValueFilteringParameters = new ArrayList<>(); 
          ArrayNode arrayNode = (ArrayNode) jsonNode; 
          arrayNode.forEach(node -> { 
           String field = node.get("field").asText(); 
           String operator = node.get("operator").asText(); 
           if (!StringUtils.isEmpty(field) && !StringUtils.isEmpty(operator)) { 
            String values = node.get("values").asText(); 
            String[] valuesArray = !StringUtils.isEmpty(values) ? values.split(",") : null; 
            if (valuesArray != null) { 
             if (valuesArray.length > 1) { 
              filteringParametersList.add(ApiFilteringParameters 
                .of(field, Operator.valueOf(operator), valuesArray)); 
             } else { 
              singleValueFilteringParameters.add(ApiSingleValueFilteringParameters 
                .of(field, Operator.valueOf(operator), valuesArray[0])); 
             } 
            } 
           } 
          }); 
          if (!filteringParametersList.isEmpty()) { 
           builder.filterings(filteringParametersList); 
          } 
          if (!singleValueFilteringParameters.isEmpty()) { 
           builder.filterings(singleValueFilteringParameters); 
          } 
         } 

        } catch (IOException e) { 
         throw new UncheckedIOException(e); 
        } 
       } 
       String start = jobParameters.getString("time_ranges.start"); 
       String end = jobParameters.getString("time_ranges.end"); 
       String since = jobParameters.getString("time_range.since"); 
       String until = jobParameters.getString("time_range.until"); 

       if (!StringUtils.isEmpty(start) && !StringUtils.isEmpty(end)) { 
        builder.timeRanges(ApiParameters.timeRanges(start, end)); 
       } else if (!StringUtils.isEmpty(since) && !StringUtils.isEmpty(until)) { 
        builder.timeRange(new TimeRange(since, until)); 
       } 
       String actionBreakdowns = jobParameters.getString("action_breakdowns"); 
       if (!StringUtils.isEmpty(actionBreakdowns)) { 
        multiValueMap.set("action_breakdowns", actionBreakdowns); 
       } 
       String attributionWindows = jobParameters.getString("action_attribution_windows"); 
       if (attributionWindows != null) { 
        try { 
         multiValueMap 
           .set("action_attribution_windows", 
             objectMapper.writeValueAsString(attributionWindows.split(","))); 
        } catch (JsonProcessingException e) { 
         e.printStackTrace(); 
        } 
       } 
       builder.multiValueMap(multiValueMap); 
       String pageSize = jobParameters.getString("pageSize"); 
       if (!StringUtils.isEmpty(pageSize)) { 
        builder.limit(pageSize); 
       } 
       return builder.build(); 
      } 
     }; 
    } 
} 

這裏是信息的流動方式:

1. channel[async_ads_insights] ->IntegrationFlow[amqpOutboundAsyncAdsInsights]->[AMQP]->IntegrationFlow[amqpAdsInsightsAsyncJobRequestFlow]->channel[ad_report_run_polling_channel]->IntegrationFlow[adReportRunPollingLoopFlow]-IF END LOOP->channel[batch_launch_channel] ELSE -> channel[ad_report_run_polling_channel] 

    2. channel[batch_launch_channel] -> IntegrationFlow[amqpOutbound]-> IntegrationFlow[amqpLaunchSpringBatchJobFlow] 

    3. Spring Batch Job is launched. 

的異常沒有拋出兩個實例啓動後,但經過一段時間。啓動Spring批處理作業成功,但隨後開始失敗,「作業實例已經存在並且已完成...」

該作業用於檢索Facebook廣告結果。

我想感謝您對上述錯誤的看法。

我也有這種配置,它不使用AMQP,沒有任何問題,但它只是一個實例。

@Configuration 
@Conditional(SimpleBatchLaunchCondition.class) 
@Slf4j 
public class SimpleBatchLaunchIntegrationFlows { 

    @Autowired 
    SpringBatchLauncher batchLauncher; 

    @Autowired 
    DataSource dataSource; 

    @Bean(name = "batch_launch_channel") 
    public MessageChannel batchLaunchChannel() { 
     return MessageChannels.queue(jdbcChannelMessageStore(), "batch_launch_channel").get(); 
    } 

    @Bean 
    public ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider() { 
     return new MySqlChannelMessageStoreQueryProvider(); 
    } 

    @Bean 
    public JdbcChannelMessageStore jdbcChannelMessageStore() { 
     JdbcChannelMessageStore channelMessageStore = new JdbcChannelMessageStore(dataSource); 
     channelMessageStore.setChannelMessageStoreQueryProvider(channelMessageStoreQueryProvider()); 
     channelMessageStore.setUsingIdCache(true); 
     channelMessageStore.setPriorityEnabled(true); 
     return channelMessageStore; 
    } 

    @Bean 
    public IntegrationFlow launchSpringBatchJobFlow(@Qualifier("batch_launch_channel") 
      MessageChannel batchLaunchChannel) { 
     return IntegrationFlows.from(batchLaunchChannel) 
       .handle(message -> { 
        String jobName = (String) message.getHeaders().get("job_name"); 
        JobParameters jobParameters = (JobParameters) message.getPayload(); 
        batchLauncher.launchJob(jobName, jobParameters); 
       }, e->e.poller(Pollers.fixedRate(500).receiveTimeout(500))).get(); 
    } 
} 
+0

我添加了不同的@Configuration,它不使用AMQP(SimpleBatchLaunchIntegrationFlows),但僅適用於單個實例(沒有工作共享) – hanishi

回答

1

請參閱Spring Batch文檔。啓動作業的新實例時,作業參數必須是唯一的。

一個常見的解決方案是添加一個帶有UUID或類似參數的僞參數,但批量提供策略,例如每次增加一個數字參數。

EDIT

存在一定的類,其中成員,其中被認爲是不可恢復(致命的),它是沒有意義的嘗試重新傳遞的異常。

示例包括MessageConversionException - 如果我們無法第一次轉換它,我們可能無法轉換爲重新傳遞。 ConditionalRejectingErrorHandler是我們檢測這種例外情況的機制,並導致它們被永久拒絕(而不是重新遞交)。

其他例外情況會導致郵件默認被重新發送 - 另一個屬性defaultRequeuRejected可以設置爲false以永久拒絕所有失敗(不推薦)。

您可以通過繼承其DefaultExceptionStrategy自定義錯誤處理程序 - 覆蓋isUserCauseFatal(Throwable cause)掃描cause樹尋找一個JobInstanceAlreadyCompleteException並返回true(cause.getCause().getCause() instanceof ...

我認爲這是由拋出的錯誤觸發「SpringBatch作業已經運行」異常。

這仍然表明你以某種方式收到第二條消息與相同的參數;這是一個不同的錯誤,因爲原始作業仍在運行;該消息被拒絕(並重新排序),但在隨後的交付中,您將獲得已完成的異常。

所以,我仍然說你的問題的根本原因是重複的請求,但你可以避免通道適配器的偵聽器容器中的自定義錯誤處理程序的行爲。

我建議你記錄重複的信息,這樣你可以找出你爲什麼得到它們。

+0

感謝您的迴應。我知道這不是工作參數,因爲每一個正在執行的工作參數都有一個唯一可識別的組合。在一段時間內,作業確實執行並且沒有錯誤地完成,直到拋出這個異常,並且當我開始在控制檯中看到兩個實例都可能從AMQP中獲取相同的作業可識別參數時,這些參數直到前面提到的纔會發生拋出異常。我認爲「聽衆拋出異常」是這種情況,我想知道如何防止這種情況。 – hanishi

+0

這是明確的。如果你得到這個異常,你試圖用相同的參數開始一項工作。它與rabbitmq無關。調試日誌應該可以幫助你追蹤它。 –

+0

我看到的是ConditionalRejectingErrorHandler日誌中 2017-10-07 10:26:24.572警告55577 --- [erContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler:執行Rabbit消息偵聽器失敗。 根據文檔,它將「有條件地把異常包裝在一個AmqpRejectAndDontRequeueException」中,我猜測這導致同一個作業的「重新運行 」具有相同的參數。 什麼是AmqpRejectAndDontRequeueException? – hanishi