2017-08-16 108 views
3

我想了解爲什麼我想用RabbitMQ的Spring雲流。我看了一下RabbitMQ Spring教程4(https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html),這基本上是我想要做的。它創建一個直接與2個隊列連接的交換機,並根據路由鍵將消息路由到Q1或Q2。春雲流RabbitMQ

整個過程是非常簡單的,如果你看教程,你創建所有的部分,綁定在一起,你準備好了。

我想知道使用Sing Cloud Stream會帶來什麼好處,如果這甚至是它的用例。很容易創建一個簡單的交換,甚至定義目標和組是直接與流。所以我想爲什麼不去更進一步,並嘗試用流處理教程案例。

我已經看到Stream有一個BinderAwareChannelResolver這似乎是做同樣的事情。但我正在努力將它們放在一起,以達到與RabbitMQ Spring教程中相同的效果。我不知道這是否是一個依賴的問題,但我似乎從根本上誤解了這裏的東西,我想是這樣的:

spring.cloud.stream.bindings.output.destination=myDestination 
spring.cloud.stream.bindings.output.group=consumerGroup 
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression='key' 

應的伎倆。

有沒有人有一個源和接收器的基本創建直接交換,綁定2個隊列的最小示例,並取決於路由關鍵路由到2個隊列之一,如https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html

編輯

下面是一組最少的代碼演示瞭如何做我問。我沒有附上build.gradle,因爲它是直線前進(但如果有人有興趣,讓我知道)

application.properties:建立生產者

spring.cloud.stream.bindings.output.destination=tut.direct 
spring.cloud.stream.rabbit.bindings.output.producer.exchangeType=direct 
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.type 

Sources.class:建立生產者通道

public interface Sources { 

    String OUTPUT = "output"; 

    @Output(Sources.OUTPUT) 
    MessageChannel output(); 
} 

StatusController.class:響應其餘呼叫併發送具有特定路由鍵的消息

/** 
* Status endpoint for the health-check service. 
*/ 
@RestController 
@EnableBinding(Sources.class) 
public class StatusController { 

    private int index; 

    private int count; 

    private final String[] keys = {"orange", "black", "green"}; 

    private Sources sources; 

    private StatusService status; 

    @Autowired 
    public StatusController(Sources sources, StatusService status) { 
     this.sources = sources; 
     this.status = status; 
    } 

    /** 
    * Service available, service returns "OK"'. 
    * @return The Status of the service. 
    */ 
    @RequestMapping("/status") 
    public String status() { 
     String status = this.status.getStatus(); 

     StringBuilder builder = new StringBuilder("Hello to "); 
     if (++this.index == 3) { 
      this.index = 0; 
     } 
     String key = keys[this.index]; 
     builder.append(key).append(' '); 
     builder.append(Integer.toString(++this.count)); 
     String payload = builder.toString(); 
     log.info(payload); 

     // add kv pair - routingkeyexpression (which matches 'type') will then evaluate 
     // and add the value as routing key 
     Message<String> msg = new GenericMessage<>(payload, Collections.singletonMap("type", key)); 
     sources.output().send(msg); 

     // return rest call 
     return status; 
    } 
} 
事情

消費者方面,性能:

spring.cloud.stream.bindings.input.destination=tut.direct 
spring.cloud.stream.rabbit.bindings.input.consumer.exchangeType=direct 
spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=orange 
spring.cloud.stream.bindings.inputer.destination=tut.direct 
spring.cloud.stream.rabbit.bindings.inputer.consumer.exchangeType=direct 
spring.cloud.stream.rabbit.bindings.inputer.consumer.bindingRoutingKey=black 

Sinks.class

public interface Sinks { 

    String INPUT = "input"; 

    @Input(Sinks.INPUT) 
    SubscribableChannel input(); 

    String INPUTER = "inputer"; 

    @Input(Sinks.INPUTER) 
    SubscribableChannel inputer(); 
} 

ReceiveStatus.class:接收狀態:

@EnableBinding(Sinks.class) 
public class ReceiveStatus { 
    @StreamListener(Sinks.INPUT) 
    public void receiveStatusOrange(String msg) { 
     log.info("I received a message. It was orange number: {}", msg); 
    } 

    @StreamListener(Sinks.INPUTER) 
    public void receiveStatusBlack(String msg) { 
     log.info("I received a message. It was black number: {}", msg); 
    } 
} 

回答

3

春雲流可以開發事件驅動的微服務應用程序通過使應用程序連接(通過@EnableBinding)到e使用Spring Cloud流綁定器實現的外部消息系統(Kafka,RabbitMQ,JMS綁定器等)。顯然,Spring Cloud Stream使用Spring AMQP來實現RabbitMQ binder。

BinderAwareChannelResolver適用於動態綁定支持生產者,我認爲你的情況是關於配置交換和消費者綁定到該交換。例如,根據您的標準以及具有您上面提到的屬性(routing-key-expression,destination)的單個生產者(羣組除外),您需要有2個消費者使用適當的bindingRoutingKey集合。我注意到你已經爲出站頻道配置了group。該group屬性僅適用於消費者(因此入站)。

你可能也想檢查一下:https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues/57,因爲我看到使用routing-key-expression的一些討論。具體而言,請使用表達式值檢查this

+0

感謝您的回答。我已經看過提到的問題,他們是我真的問這個計算器問題的原因。對於其他人來說似乎很清楚的是,對於我來說它並不是。你對BinderAwareChannelResolver的解釋證明我理解我到達了正確的角落:)。但是,我試圖設置路由鍵表達式,但它不起作用。似乎像gradle的依賴問題,但我沒有得到它的工作。這就是爲什麼我要求一個示例項目。 – maiksensi

+0

似乎我得到了消費方工作通過: 'spring.cloud.stream.bindings.input.destination = tut.direct spring.cloud.stream.rabbit.bindings.input.consumer.exchangeType = direct spring.cloud .stream.rabbit.bindings.input.consumer.bindingRoutingKey =橙 spring.cloud.stream.bindings.inputer.destination = tut.direct spring.cloud.stream.rabbit.bindings.inputer.consumer.exchangeType =直接 彈簧.cloud.stream.rabbit.bindings.inputer.consumer.bindingRoutingKey = black' 只剩下動態設置生產者端的路由密鑰。 – maiksensi