2015-12-12 178 views
2

我想知道我可以如何使RedisQueueMessageDrivenEndpoint與IntegrationFlow一起工作,以便我可以接收從列表中出列的消息,並在下面的代碼中指定它。 「redisRpopChannel()」似乎根本沒有收到任何消息。請幫忙。如何使RedisQueueMessageDrivenEndpoint與IntegrationFlow一起使用?

@Bean 
public RedisOutboundGateway redisOutboundGateway(RedisConnectionFactory connectionFactory) { 
    RedisOutboundGateway gateway = new RedisOutboundGateway(connectionFactory); 
    Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class); 
    gateway.setArgumentsSerializer(serializer); 
    return gateway; 
} 

@Bean 
public IntegrationFlow redisLpushRequestFlow(RedisOutboundGateway gateway, BeanFactory beanFactory) { 
    ExpressionArgumentsStrategy strategy = new ExpressionArgumentsStrategy(new String[]{"headers.queue", "#cmd == 'LPUSH' ? payload : null"}, true); 
    strategy.setBeanFactory(beanFactory); 
    gateway.setArgumentsStrategy(strategy); 
    return flow -> flow.publishSubscribeChannel(s->s.subscribe(f -> f 
      .enrich(e -> e.<ObjectNode>requestPayload(m -> { 
       String partition = m.getHeaders().get("correlationId").toString(); 
       ObjectNode objectNode = m.getPayload(); 
       objectNode.put(PayLoadKeys.PARTITION, partition); 
       objectNode.put(PayLoadKeys.SEQ, m.getHeaders().get("sequenceNumber").toString()); 
       return objectNode; 
      }).shouldClonePayload(false) 
        .header(RedisHeaders.COMMAND, "LPUSH").header("queue", files)) 
      .handle(gateway).channel("redisLpushResponseFlow.input"))); 
} 

@Bean 
public IntegrationFlow redisLpushResponseFlow() { 
    return flow -> flow.resequence().aggregate().<List<Long>>handle((p,h)-> { 
       ObjectNode objectNode = mapper.createObjectNode(); 
       objectNode.put(PayLoadKeys.PARTITION, h.get("correlationId").toString()); 
       if(h.get("mode").equals("debug")) { 
        objectNode.set(PayLoadKeys.DEBUG, 
          mapper.valueToTree(p.stream().collect(Collectors.toList()))); 
       } 
       return objectNode; 
      }).channel(httpInboundReplyChannel()); 
@Bean 
public MessageChannel redisRpopChannel() { 
    return MessageChannels.queue().get(); 
} 

@Bean(name = PollerMetadata.DEFAULT_POLLER) 
public PollerMetadata poller() { 
    return Pollers.fixedRate(500).get(); 
} 

@Bean 
public RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint(RedisConnectionFactory connectionFactory, BeanFactory beanFactory) { 
    RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint(files, connectionFactory); 
    Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class); 
    endpoint.setSerializer(serializer); 
    endpoint.setBeanFactory(beanFactory); 
    endpoint.setAutoStartup(true); 
    endpoint.setOutputChannel(redisRpopChannel()); 
    endpoint.afterPropertiesSet(); 
    endpoint.start(); 
    return endpoint; 
} 

@Bean 
public IntegrationFlow redisQueuePollingFlow() { 

    class ThrottledTaskExecutor implements TaskExecutor { 
     final Semaphore semaphore; 
     final TaskExecutor taskExecutor; 

     ThrottledTaskExecutor(ThreadPoolTaskExecutor taskExecutor) { 
      this.taskExecutor = taskExecutor; 
      this.semaphore = new Semaphore(taskExecutor.getCorePoolSize()); 
     } 

     @Override 
     public void execute(Runnable task) { 
      if (task == null) { 
       throw new NullPointerException("Task is null in ThrottledTaskExecutor."); 
      } 
      doSubmit(task); 
     } 

     void doSubmit(final Runnable task) { 
      try { 
       semaphore.acquire(); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
       throw new TaskRejectedException("Task could not be submitted because of a thread interruption."); 
      } 
      try { 
       taskExecutor.execute(new FutureTask<Void>(task, null) { 

        @Override 
        protected void done() { 
         semaphore.release(); 
        } 
       }); 
      } catch (TaskRejectedException e) { 
       semaphore.release(); 
       throw e; 
      } 
     } 
    } 

    return IntegrationFlows 
      .from(redisRpopChannel()) 
      .transform(Transformers.fromJson(ObjectNode.class)) 
      .handle(message -> { 
       ObjectNode p = (ObjectNode) message.getPayload(); 
       ThreadPoolTaskExecutor taskExecutor = taskExecutor(); 
       ThrottledTaskExecutor throttledTaskExecutor = new ThrottledTaskExecutor(taskExecutor); 
       if(p.hasNonNull(PayLoadKeys.ID_ARRAY)) { 
        String array = p.remove(PayLoadKeys.ID_ARRAY).asText(); 
        if (p.hasNonNull(array)) { 
         p.remove(array).forEach(id -> { 
          ObjectNode param = p.deepCopy(); 
          final Long finalId = id.asLong(); 
          param.put("id", finalId); 
          throttledTaskExecutor.execute(new JobLaunchTask(param)); 
         }); 
        } 
       } else { 
        throttledTaskExecutor.execute(new JobLaunchTask(p)); 
       } 
       taskExecutor.shutdown(); 
      }).get(); 
} 

回答

1

使用消息驅動的端點在DSL(被定義爲@Bean S)當目前a problem

問題是在初始化期間需要輸出通道。但是,當端點稍後連接到流中時,該通道將被替換。

你不應該調用@Bean定義中像afterPropertiesSet()start()方法。

這個工作對我來說...

@Bean 
public RedisConnectionFactory connectionFactory() { 
    JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(); 
    jedisConnectionFactory.setPort(6379); 
    return jedisConnectionFactory; 
} 

@Bean 
public RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint(RedisConnectionFactory connectionFactory) { 
    RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint("foo", connectionFactory); 
    Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class); 
    endpoint.setSerializer(serializer); 
    endpoint.setAutoStartup(true); 
    endpoint.setOutputChannel(new DirectChannel()); // will be replaced 
    return endpoint; 
} 

@Bean 
public IntegrationFlow flow(RedisConnectionFactory connectionFactory) { 
    return IntegrationFlows.from(redisQueueMessageDrivenEndpoint(connectionFactory)) 
      .handle(System.out::println) 
      .get(); 
} 

我在Redis的-CLI > lpush foo '{"foo":"bar"}'進行了測試。

編輯

然而,你的技術也工作(我)......

@Bean 
public RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint(RedisConnectionFactory connectionFactory) { 
    RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint("foo", connectionFactory); 
    Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class); 
    endpoint.setSerializer(serializer); 
    endpoint.setAutoStartup(true); 
    endpoint.setOutputChannel(rpopChannel()); 
    return endpoint; 
} 

@Bean 
public IntegrationFlow flow(RedisConnectionFactory connectionFactory) { 
    return IntegrationFlows.from(rpopChannel()) 
      .handle(System.out::println) 
      .get(); 
} 

@Bean 
public MessageChannel rpopChannel() { 
    return new DirectChannel(); 
} 

同樣,我刪除了從端點的所有容器管理的性能;春天設置所有這些。

+0

感謝您的評論。我非常感激。我也很佩服你的工作。我想知道爲什麼沒有協議適配器支持使用redis進行迭代並添加到組中,所以我們都可以像IntegrationFlows.from(Redis.inboundGateway(connectionFactory,queue))那樣編寫? – hanishi

+0

我們資源有限。我們要求社區幫助我們優先考慮通過[JIRA](https://jira.spring.io/browse/INTEXT)向Redis等添加一流的支持。隨意在那裏打開一個改進問題,我們會看到我們能做些什麼。我們也歡迎[貢獻](https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md)。 –

相關問題