1
我有一個簡單的彈簧DSL流程如下:PublishSubscribeChannel使用的TaskExecutor - 線程的行爲
@Configuration
public class OrderFlow {
private static final Logger logger = LoggerFactory.getLogger(OrderFlow.class);
@Autowired
private OrderSubFlow orderSubFlow;
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Bean
public IntegrationFlow orders() {
return IntegrationFlows.from(MessageChannels.direct("order_input").get()).handle(new GenericHandler<Order>() {
@Override
public Object handle(Order order, Map<String, Object> headers) {
logger.info("Pre-Processing order with id: {}", order.getId());
return MessageBuilder.withPayload(order).copyHeaders(headers).build();
}
}).publishSubscribeChannel(threadPoolTaskExecutor, new Consumer<PublishSubscribeSpec>() {
@Override
public void accept(PublishSubscribeSpec t) {
t.subscribe(orderSubFlow);
}
}).handle(new GenericHandler<Order>() {
@Override
public Object handle(Order order, Map<String, Object> headers) {
logger.info("Post-Processing order with id: {}", order.getId());
return MessageBuilder.withPayload(order).copyHeaders(headers).build();
}
}).get();
}
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setMaxPoolSize(2);
threadPoolTaskExecutor.setCorePoolSize(2);
threadPoolTaskExecutor.setQueueCapacity(10);
return threadPoolTaskExecutor;
}
}
而且OrderSubFlow是
@Configuration
public class OrderSubFlow implements IntegrationFlow {
private static final Logger logger = LoggerFactory.getLogger(OrderSubFlow.class);
@Override
public void configure(IntegrationFlowDefinition<?> flow) {
flow.handle(new GenericHandler<Order>() {
@Override
public Object handle(Order order, Map<String, Object> headers) {
logger.info("Processing order with id: {}", order.getId());
return null;
}
});
}
}
當我把消息進入「order_input」信道,它的執行第一處理程序OrderFlow在主threa TaskExecutor線程中的d和OrderSubFlow處理程序,這是預期的。但OrderFlow第二個處理程序也在TaskExecutor線程中執行。這是預期的行爲嗎?不應該在主線程本身中執行OrderFlow第二個處理程序嗎?
請參閱下面的日誌。
INFO 9648 --- [ main] com.example.flows.OrderFlow : Pre-Processing order with id: 10
INFO 9648 --- [lTaskExecutor-1] com.example.flows.OrderSubFlow : Processing order with id: 10
INFO 9648 --- [lTaskExecutor-2] com.example.flows.OrderFlow : Post-Processing order with id: 10
這裏是我使用
@MessagingGateway
public interface OrderService {
@Gateway(requestChannel="order_input")
Order processOrder(Order order);
}