我想弄清楚如何使用彈性引導流與MQueue使用spring-cloud-stream-binder-ibm-mq。我可以連接到MQueue,但獲得Could not provision topic 'queue///EMB_DEV_QUEUE'
和MQJE001: Completion Code '2', Reason '2035'
。我確實與管理員確認這是一個隊列,而不是一個主題。Spring Stream試圖創建主題而不是隊列IBM MQueue
我可以連接使用一些示例代碼使用MQQueueConnectionFactory
基於simplest-sample-applications-using-websphere-mq-jms所以我知道MQueue工作。
這是我的程序。我用卡夫卡的相同模式成功。
@EnableBinding({Sink.class, Source.class})
@SpringBootApplication
public class MQueueStreamApplication {
private final static AtomicInteger counter = new AtomicInteger();
private final Logger logger = LoggerFactory.getLogger(getClass());
public static void main(String[] args) {
SpringApplication.run(MQueueStreamApplication.class, args);
}
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedRate = "2000"))
public MessageSource<String> timeSource() {
return() -> {
String message = String.format("Timed Message %d", counter.incrementAndGet());
logger.info("Producing Message: {}", message);
return MessageBuilder.withPayload(message).setHeader("Message-Type", "mqueue-stream").build();
};
}
@ServiceActivator(inputChannel = Sink.INPUT)
public void serviceSink(Message<String> message) {
String payload = String.valueOf(message.getPayload());
logger.info("Received Message: {} [{}]", payload, message.getHeaders());
}
}
這是我的application.yml
。我曾嘗試使用和不使用queue:///
前綴。示例程序使用前綴。
spring:
cloud:
stream:
bindings:
input:
destination: queue:///EMB_DEV_QUEUE
group: mqueue-stream
# binder: ibmmq
output:
destination: queue:///EMB_DEV_QUEUE
ibmmq:
host: vm-dev-q01.corp.int
port: 1414
channel: EMB_DEV_CHANNEL
queueManager: EMB_DEV_QMGR
這是我的Gradle構建。
buildscript {
ext {
springBootVersion = '1.5.3.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
mavenLocal()
mavenCentral()
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-actuator')
compile('org.springframework.cloud:spring-cloud-stream')
compile('org.springframework.cloud:spring-cloud-stream-binder-jms-ibm-mq:1.0.0.BUILD-SNAPSHOT')
testCompile('org.springframework.boot:spring-boot-starter-test')
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:Dalston.RELEASE"
}
}
我按照指示構建了spring-cloud-stream-binder-ibm-mq
。我從MQueue安裝中獲得了兩個必需的罐子。該清單說9.0.0.0
版本,所以我用9
在pom.xml
我是新的MQueue和有限的經驗與流。我已經能夠成功連接到Kafka。我將不勝感激任何幫助。
Wes。
我看過幾次還是不太確定。可能我對MQ的經驗不足。 IBM文檔暗示一個主題只是一個標籤,但是通過它看起來是訪問主題的代碼,然後使用topic.queue約定在主題中排隊。這聽起來像一個結構,或者只是一個訂閱?我的目標是讓消費者在無法處理消息時返回消息。首先,我想到Client_Ack,但現在想着2個目的地。一個用於重試,另一個用於重試失敗。他們是否必須成爲話題?希望重試成爲相同的隊列。沒有什麼不同。謝謝 – Wes