2017-05-04 142 views
0

我是一個試圖整合qpidApache Camel的新手。我需要編寫java代碼來使用qpid從隊列讀取和寫入。javax.jms.JMSException:現有連接被遠程主機強制關閉

所以首先我從qpid網站下載了JMS例子。我試圖運行的代碼是。

/* 
* 
* Licensed to the Apache Software Foundation (ASF) under one 
* or more contributor license agreements. See the NOTICE file 
* distributed with this work for additional information 
* regarding copyright ownership. The ASF licenses this file 
* to you under the Apache License, Version 2.0 (the 
* "License"); you may not use this file except in compliance 
* with the License. You may obtain a copy of the License at 
* 
* http://www.apache.org/licenses/LICENSE-2.0 
* 
* Unless required by applicable law or agreed to in writing, 
* software distributed under the License is distributed on an 
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
* KIND, either express or implied. See the License for the 
* specific language governing permissions and limitations 
* under the License. 
* 
*/ 
package org.apache.qpid.jms.example; 

import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.DeliveryMode; 
import javax.jms.Destination; 
import javax.jms.ExceptionListener; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import javax.naming.Context; 
import javax.naming.InitialContext; 

public class HelloWorld { 
    public static void main(String[] args) throws Exception { 
     try { 
      // The configuration for the Qpid InitialContextFactory has been supplied in 
      // a jndi.properties file in the classpath, which results in it being picked 
      // up automatically by the InitialContext constructor. 
      Context context = new InitialContext(); 

      ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup"); 
      Destination queue = (Destination) context.lookup("myQueueLookup"); 

      Connection connection = factory.createConnection(System.getProperty("USER"), System.getProperty("PASSWORD")); 
      connection.setExceptionListener(new MyExceptionListener()); 
      connection.start(); 

      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

      MessageProducer messageProducer = session.createProducer(queue); 
      MessageConsumer messageConsumer = session.createConsumer(queue); 

      TextMessage message = session.createTextMessage("Hello world!"); 
      messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); 
      TextMessage receivedMessage = (TextMessage) messageConsumer.receive(2000L); 

      if (receivedMessage != null) { 
       System.out.println(receivedMessage.getText()); 
      } else { 
       System.out.println("No message received within the given timeout!"); 
      } 

      connection.close(); 
     } catch (Exception exp) { 
      System.out.println("Caught exception, exiting."); 
      exp.printStackTrace(System.out); 
      System.exit(1); 
     } 
    } 

    private static class MyExceptionListener implements ExceptionListener { 
     @Override 
     public void onException(JMSException exception) { 
      System.out.println("Connection ExceptionListener fired, exiting."); 
      exception.printStackTrace(System.out); 
      System.exit(1); 
     } 
    } 
} 

這是依賴於文件::

# Set the InitialContextFactory class to use 
java.naming.factory.initial = org.apache.qpid.jms.jndi.JmsInitialContextFactory 
# Define the required ConnectionFactory instances 
# connectionfactory.<JNDI-lookup-name> = <URI> 
connectionfactory.myFactoryLookup = amqp://localhost:5672 

# Configure the necessary Queue and Topic objects 
# queue.<JNDI-lookup-name> = <queue-name> 
# topic.<JNDI-lookup-name> = <topic-name> 
queue.myQueueLookup = queue 
topic.myTopicLookup = topic 

現在我明白了序這個工作,我需要的東西被稱爲Broker服務。做一些研究,我發現我可以使用RabbitMQ來達到這個目的。所以,我下載我的Windows機器上,我試圖連接到它的端口localhost:5672

但是當我運行我的代碼,我得到的錯誤::

2017-05-04 11:28:29,329 [main   ] - ERROR JmsConnection     - Failed to connect to remote at: amqp://localhost:5672 
Caught exception, exiting. 
javax.jms.JMSException: An existing connection was forcibly closed by the remote host 
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:86) 
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:108) 
    at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:172) 
    at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:204) 
    at org.apache.qpid.jms.example.HelloWorld.main(HelloWorld.java:48) 
Caused by: java.io.IOException: An existing connection was forcibly closed by the remote host 
    at sun.nio.ch.SocketDispatcher.read0(Native Method) 
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) 
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) 
    at sun.nio.ch.IOUtil.read(IOUtil.java:192) 
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) 
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) 
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100) 
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:372) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:624) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:559) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:476) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:438) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) 
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) 
    at java.lang.Thread.run(Thread.java:745) 

爲什麼這個錯誤發生?而這個端口肯定在我的本地機器上監聽。再次我是JMS的新手,所以任何指導將不勝感激:)

+0

服務器正在運行嗎? – Ishnark

+0

是的。我做了一個'telnet 127.0.0.1 5672',它工作。 –

+0

不確定這是你的qpid版本的情況,但似乎有一個[qpid 0.6版本中的錯誤,阻止正確協商協議版本](https://www.rabbitmq.com/interoperability.html),這裏是聲明:Qpid java客戶端的0.6版本發佈了一個錯誤,導致它無法正確協商協議版本。由於它默認爲AMQP 0-10,它無法連接到任何0-8或0-9-1代理(包括RabbitMQ).' – Olivier

回答

2

正如蒂姆提到的那樣,您至少也需要確保經紀人實驗性AMQP 1.0插件已被加載,而您並未提及此操作。

但是,在這種情況下,它可能沒有太大的區別。我沒有使用JMS客戶端或對RabbitMQ的其他一些AMQP 1.0客戶端之前很成功,由於我報告的問題建立消費者和生產者時阻止他們在其軌道:https://github.com/rabbitmq/rabbitmq-amqp1.0/issues/34

你提到的研究的東西在決定使用RabbitMQ,這聽起來像你沒有綁定到現有的服務器解決方案?如果是這樣,對於支持AMQP 1.0和JMS客戶端的其他服務器經常使用,您可以嘗試使用ActiveMQ,ActiveMQ Artemis,Qpid for Java代理,Qpid C++代理或Qpid Dispatch路由器(不適用於Windows,您提及使用)等等。

1

它看起來像你可能會使用Qpid JMS AMQP v1.0 client這將無法連接到RabbitMQ,除非你使用RabbitMQ的實驗性AMQP 1.0插件。

確保您使用的是與您正在運行的代理兼容的客戶端。

相關問題