2
我想知道我可以如何使RedisQueueMessageDrivenEndpoint與IntegrationFlow一起工作,以便我可以接收從列表中出列的消息,並在下面的代碼中指定它。 「redisRpopChannel()」似乎根本沒有收到任何消息。請幫忙。如何使RedisQueueMessageDrivenEndpoint與IntegrationFlow一起使用?
@Bean
public RedisOutboundGateway redisOutboundGateway(RedisConnectionFactory connectionFactory) {
RedisOutboundGateway gateway = new RedisOutboundGateway(connectionFactory);
Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class);
gateway.setArgumentsSerializer(serializer);
return gateway;
}
@Bean
public IntegrationFlow redisLpushRequestFlow(RedisOutboundGateway gateway, BeanFactory beanFactory) {
ExpressionArgumentsStrategy strategy = new ExpressionArgumentsStrategy(new String[]{"headers.queue", "#cmd == 'LPUSH' ? payload : null"}, true);
strategy.setBeanFactory(beanFactory);
gateway.setArgumentsStrategy(strategy);
return flow -> flow.publishSubscribeChannel(s->s.subscribe(f -> f
.enrich(e -> e.<ObjectNode>requestPayload(m -> {
String partition = m.getHeaders().get("correlationId").toString();
ObjectNode objectNode = m.getPayload();
objectNode.put(PayLoadKeys.PARTITION, partition);
objectNode.put(PayLoadKeys.SEQ, m.getHeaders().get("sequenceNumber").toString());
return objectNode;
}).shouldClonePayload(false)
.header(RedisHeaders.COMMAND, "LPUSH").header("queue", files))
.handle(gateway).channel("redisLpushResponseFlow.input")));
}
@Bean
public IntegrationFlow redisLpushResponseFlow() {
return flow -> flow.resequence().aggregate().<List<Long>>handle((p,h)-> {
ObjectNode objectNode = mapper.createObjectNode();
objectNode.put(PayLoadKeys.PARTITION, h.get("correlationId").toString());
if(h.get("mode").equals("debug")) {
objectNode.set(PayLoadKeys.DEBUG,
mapper.valueToTree(p.stream().collect(Collectors.toList())));
}
return objectNode;
}).channel(httpInboundReplyChannel());
@Bean
public MessageChannel redisRpopChannel() {
return MessageChannels.queue().get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedRate(500).get();
}
@Bean
public RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint(RedisConnectionFactory connectionFactory, BeanFactory beanFactory) {
RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint(files, connectionFactory);
Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class);
endpoint.setSerializer(serializer);
endpoint.setBeanFactory(beanFactory);
endpoint.setAutoStartup(true);
endpoint.setOutputChannel(redisRpopChannel());
endpoint.afterPropertiesSet();
endpoint.start();
return endpoint;
}
@Bean
public IntegrationFlow redisQueuePollingFlow() {
class ThrottledTaskExecutor implements TaskExecutor {
final Semaphore semaphore;
final TaskExecutor taskExecutor;
ThrottledTaskExecutor(ThreadPoolTaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
this.semaphore = new Semaphore(taskExecutor.getCorePoolSize());
}
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("Task is null in ThrottledTaskExecutor.");
}
doSubmit(task);
}
void doSubmit(final Runnable task) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TaskRejectedException("Task could not be submitted because of a thread interruption.");
}
try {
taskExecutor.execute(new FutureTask<Void>(task, null) {
@Override
protected void done() {
semaphore.release();
}
});
} catch (TaskRejectedException e) {
semaphore.release();
throw e;
}
}
}
return IntegrationFlows
.from(redisRpopChannel())
.transform(Transformers.fromJson(ObjectNode.class))
.handle(message -> {
ObjectNode p = (ObjectNode) message.getPayload();
ThreadPoolTaskExecutor taskExecutor = taskExecutor();
ThrottledTaskExecutor throttledTaskExecutor = new ThrottledTaskExecutor(taskExecutor);
if(p.hasNonNull(PayLoadKeys.ID_ARRAY)) {
String array = p.remove(PayLoadKeys.ID_ARRAY).asText();
if (p.hasNonNull(array)) {
p.remove(array).forEach(id -> {
ObjectNode param = p.deepCopy();
final Long finalId = id.asLong();
param.put("id", finalId);
throttledTaskExecutor.execute(new JobLaunchTask(param));
});
}
} else {
throttledTaskExecutor.execute(new JobLaunchTask(p));
}
taskExecutor.shutdown();
}).get();
}
感謝您的評論。我非常感激。我也很佩服你的工作。我想知道爲什麼沒有協議適配器支持使用redis進行迭代並添加到組中,所以我們都可以像IntegrationFlows.from(Redis.inboundGateway(connectionFactory,queue))那樣編寫? – hanishi
我們資源有限。我們要求社區幫助我們優先考慮通過[JIRA](https://jira.spring.io/browse/INTEXT)向Redis等添加一流的支持。隨意在那裏打開一個改進問題,我們會看到我們能做些什麼。我們也歡迎[貢獻](https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md)。 –