2017-04-21 23 views
0

我想弄清楚如何使用彈性引導流與MQueue使用spring-cloud-stream-binder-ibm-mq。我可以連接到MQueue,但獲得Could not provision topic 'queue///EMB_DEV_QUEUE'MQJE001: Completion Code '2', Reason '2035'。我確實與管理員確認這是一個隊列,而不是一個主題。Spring Stream試圖創建主題而不是隊列IBM MQueue

我可以連接使用一些示例代碼使用MQQueueConnectionFactory基於simplest-sample-applications-using-websphere-mq-jms所以我知道MQueue工作。

這是我的程序。我用卡夫卡的相同模式成功。

@EnableBinding({Sink.class, Source.class}) 
@SpringBootApplication 
public class MQueueStreamApplication { 

    private final static AtomicInteger counter = new AtomicInteger(); 
    private final  Logger  logger = LoggerFactory.getLogger(getClass()); 

    public static void main(String[] args) { 
     SpringApplication.run(MQueueStreamApplication.class, args); 
    } 

    @Bean 
    @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedRate = "2000")) 
    public MessageSource<String> timeSource() { 
     return() -> { 
      String message = String.format("Timed Message %d", counter.incrementAndGet()); 

      logger.info("Producing Message: {}", message); 

      return MessageBuilder.withPayload(message).setHeader("Message-Type", "mqueue-stream").build(); 
     }; 
    } 

    @ServiceActivator(inputChannel = Sink.INPUT) 
    public void serviceSink(Message<String> message) { 
     String payload = String.valueOf(message.getPayload()); 

     logger.info("Received Message: {} [{}]", payload, message.getHeaders()); 
    } 

} 

這是我的application.yml。我曾嘗試使用和不使用queue:///前綴。示例程序使用前綴。

spring: 
    cloud: 
    stream: 
     bindings: 
     input: 
      destination: queue:///EMB_DEV_QUEUE 
      group: mqueue-stream 
#   binder: ibmmq 
     output: 
      destination: queue:///EMB_DEV_QUEUE 

ibmmq: 
    host: vm-dev-q01.corp.int 
    port: 1414 
    channel: EMB_DEV_CHANNEL 
    queueManager: EMB_DEV_QMGR 

這是我的Gradle構建。

buildscript { 
    ext { 
     springBootVersion = '1.5.3.RELEASE' 
    } 
    repositories { 
     mavenCentral() 
    } 
    dependencies { 
     classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") 
    } 
} 

apply plugin: 'java' 
apply plugin: 'eclipse' 
apply plugin: 'idea' 
apply plugin: 'org.springframework.boot' 

version = '0.0.1-SNAPSHOT' 

sourceCompatibility = 1.8 
targetCompatibility = 1.8 

repositories { 
    mavenLocal() 
    mavenCentral() 
} 


dependencies { 
    compile('org.springframework.boot:spring-boot-starter-actuator') 

    compile('org.springframework.cloud:spring-cloud-stream') 
    compile('org.springframework.cloud:spring-cloud-stream-binder-jms-ibm-mq:1.0.0.BUILD-SNAPSHOT') 

    testCompile('org.springframework.boot:spring-boot-starter-test') 
} 

dependencyManagement { 
    imports { 
     mavenBom "org.springframework.cloud:spring-cloud-dependencies:Dalston.RELEASE" 
    } 
} 

我按照指示構建了spring-cloud-stream-binder-ibm-mq。我從MQueue安裝中獲得了兩個必需的罐子。該清單說9.0.0.0版本,所以我用9pom.xml

我是新的MQueue和有限的經驗與流。我已經能夠成功連接到Kafka。我將不勝感激任何幫助。

Wes。

回答

0

Spring Cloud Stream比常規的JMS/IBM-MQ應用程序使用更多自以爲是的基礎架構,以便能夠實現消費者組和分區等功能 - 在這種情況下,目標是一個主題 - 有關詳細信息,請參閱https://github.com/spring-cloud/spring-cloud-stream-binder-ibm-mq#how-it-works

+0

我看過幾次還是不太確定。可能我對MQ的經驗不足。 IBM文檔暗示一個主題只是一個標籤,但是通過它看起來是訪問主題的代碼,然後使用topic.queue約定在主題中排隊。這聽起來像一個結構,或者只是一個訂閱?我的目標是讓消費者在無法處理消息時返回消息。首先,我想到Client_Ack,但現在想着2個目的地。一個用於重試,另一個用於重試失敗。他們是否必須成爲話題?希望重試成爲相同的隊列。沒有什麼不同。謝謝 – Wes