我想了解爲什麼我想用RabbitMQ的Spring雲流。我看了一下RabbitMQ Spring教程4(https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html),這基本上是我想要做的。它創建一個直接與2個隊列連接的交換機,並根據路由鍵將消息路由到Q1或Q2。春雲流RabbitMQ
整個過程是非常簡單的,如果你看教程,你創建所有的部分,綁定在一起,你準備好了。
我想知道使用Sing Cloud Stream會帶來什麼好處,如果這甚至是它的用例。很容易創建一個簡單的交換,甚至定義目標和組是直接與流。所以我想爲什麼不去更進一步,並嘗試用流處理教程案例。
我已經看到Stream有一個BinderAwareChannelResolver
這似乎是做同樣的事情。但我正在努力將它們放在一起,以達到與RabbitMQ Spring教程中相同的效果。我不知道這是否是一個依賴的問題,但我似乎從根本上誤解了這裏的東西,我想是這樣的:
spring.cloud.stream.bindings.output.destination=myDestination
spring.cloud.stream.bindings.output.group=consumerGroup
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression='key'
應的伎倆。
有沒有人有一個源和接收器的基本創建直接交換,綁定2個隊列的最小示例,並取決於路由關鍵路由到2個隊列之一,如https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html?
編輯:
下面是一組最少的代碼演示瞭如何做我問。我沒有附上build.gradle
,因爲它是直線前進(但如果有人有興趣,讓我知道)
application.properties
:建立生產者
spring.cloud.stream.bindings.output.destination=tut.direct
spring.cloud.stream.rabbit.bindings.output.producer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.type
Sources.class
:建立生產者通道
public interface Sources {
String OUTPUT = "output";
@Output(Sources.OUTPUT)
MessageChannel output();
}
StatusController.class
:響應其餘呼叫併發送具有特定路由鍵的消息
/**
* Status endpoint for the health-check service.
*/
@RestController
@EnableBinding(Sources.class)
public class StatusController {
private int index;
private int count;
private final String[] keys = {"orange", "black", "green"};
private Sources sources;
private StatusService status;
@Autowired
public StatusController(Sources sources, StatusService status) {
this.sources = sources;
this.status = status;
}
/**
* Service available, service returns "OK"'.
* @return The Status of the service.
*/
@RequestMapping("/status")
public String status() {
String status = this.status.getStatus();
StringBuilder builder = new StringBuilder("Hello to ");
if (++this.index == 3) {
this.index = 0;
}
String key = keys[this.index];
builder.append(key).append(' ');
builder.append(Integer.toString(++this.count));
String payload = builder.toString();
log.info(payload);
// add kv pair - routingkeyexpression (which matches 'type') will then evaluate
// and add the value as routing key
Message<String> msg = new GenericMessage<>(payload, Collections.singletonMap("type", key));
sources.output().send(msg);
// return rest call
return status;
}
}
事情
消費者方面,性能:
spring.cloud.stream.bindings.input.destination=tut.direct
spring.cloud.stream.rabbit.bindings.input.consumer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=orange
spring.cloud.stream.bindings.inputer.destination=tut.direct
spring.cloud.stream.rabbit.bindings.inputer.consumer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.inputer.consumer.bindingRoutingKey=black
Sinks.class
:
public interface Sinks {
String INPUT = "input";
@Input(Sinks.INPUT)
SubscribableChannel input();
String INPUTER = "inputer";
@Input(Sinks.INPUTER)
SubscribableChannel inputer();
}
ReceiveStatus.class
:接收狀態:
@EnableBinding(Sinks.class)
public class ReceiveStatus {
@StreamListener(Sinks.INPUT)
public void receiveStatusOrange(String msg) {
log.info("I received a message. It was orange number: {}", msg);
}
@StreamListener(Sinks.INPUTER)
public void receiveStatusBlack(String msg) {
log.info("I received a message. It was black number: {}", msg);
}
}
感謝您的回答。我已經看過提到的問題,他們是我真的問這個計算器問題的原因。對於其他人來說似乎很清楚的是,對於我來說它並不是。你對BinderAwareChannelResolver的解釋證明我理解我到達了正確的角落:)。但是,我試圖設置路由鍵表達式,但它不起作用。似乎像gradle的依賴問題,但我沒有得到它的工作。這就是爲什麼我要求一個示例項目。 – maiksensi
似乎我得到了消費方工作通過: 'spring.cloud.stream.bindings.input.destination = tut.direct spring.cloud.stream.rabbit.bindings.input.consumer.exchangeType = direct spring.cloud .stream.rabbit.bindings.input.consumer.bindingRoutingKey =橙 spring.cloud.stream.bindings.inputer.destination = tut.direct spring.cloud.stream.rabbit.bindings.inputer.consumer.exchangeType =直接 彈簧.cloud.stream.rabbit.bindings.inputer.consumer.bindingRoutingKey = black' 只剩下動態設置生產者端的路由密鑰。 – maiksensi