2017-04-26 36 views
1

我有一個簡單的彈簧DSL流程如下:PublishSubscribeChannel使用的TaskExecutor - 線程的行爲

@Configuration 
public class OrderFlow { 

private static final Logger logger = LoggerFactory.getLogger(OrderFlow.class); 

@Autowired 
private OrderSubFlow orderSubFlow; 

@Autowired 
private ThreadPoolTaskExecutor threadPoolTaskExecutor; 

@Bean 
public IntegrationFlow orders() { 

    return IntegrationFlows.from(MessageChannels.direct("order_input").get()).handle(new GenericHandler<Order>() { 
     @Override 
     public Object handle(Order order, Map<String, Object> headers) { 
      logger.info("Pre-Processing order with id: {}", order.getId()); 
      return MessageBuilder.withPayload(order).copyHeaders(headers).build(); 
     } 
    }).publishSubscribeChannel(threadPoolTaskExecutor, new Consumer<PublishSubscribeSpec>() { 
     @Override 
     public void accept(PublishSubscribeSpec t) { 
      t.subscribe(orderSubFlow); 
     } 
    }).handle(new GenericHandler<Order>() { 
     @Override 
     public Object handle(Order order, Map<String, Object> headers) { 
      logger.info("Post-Processing order with id: {}", order.getId()); 
      return MessageBuilder.withPayload(order).copyHeaders(headers).build(); 
     } 
    }).get(); 

} 

@Bean 
public ThreadPoolTaskExecutor threadPoolTaskExecutor() { 
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); 
    threadPoolTaskExecutor.setMaxPoolSize(2); 
    threadPoolTaskExecutor.setCorePoolSize(2); 
    threadPoolTaskExecutor.setQueueCapacity(10); 
    return threadPoolTaskExecutor; 
} 

}

而且OrderSubFlow是

@Configuration 
public class OrderSubFlow implements IntegrationFlow { 

private static final Logger logger = LoggerFactory.getLogger(OrderSubFlow.class); 

@Override 
public void configure(IntegrationFlowDefinition<?> flow) { 
    flow.handle(new GenericHandler<Order>() { 
     @Override 
     public Object handle(Order order, Map<String, Object> headers) { 
      logger.info("Processing order with id: {}", order.getId()); 
      return null; 
     } 
    }); 

} 

}

當我把消息進入「order_input」信道,它的執行第一處理程序OrderFlow在主threa TaskExecutor線程中的d和OrderSubFlow處理程序,這是預期的。但OrderFlow第二個處理程序也在TaskExecutor線程中執行。這是預期的行爲嗎?不應該在主線程本身中執行OrderFlow第二個處理程序嗎?

請參閱下面的日誌。

INFO 9648 --- [   main] com.example.flows.OrderFlow    : Pre-Processing order with id: 10 
INFO 9648 --- [lTaskExecutor-1] com.example.flows.OrderSubFlow   : Processing order with id: 10 
INFO 9648 --- [lTaskExecutor-2] com.example.flows.OrderFlow    : Post-Processing order with id: 10 

這裏是我使用

@MessagingGateway 
public interface OrderService { 

@Gateway(requestChannel="order_input") 
Order processOrder(Order order); 

} 

回答

1

請網關,閱讀https://jira.spring.io/browse/INT-4264的討論。這是真正的預期行爲。僅僅因爲該處理程序是publishSubscribeChannel的另一個訂閱者。

如果其中一個收件人是帶有Executor的pub-sub,而另一個收件人是在主線程中繼續的DirectChannel,則可以使用.routeToRecipients()進行設置。