我無法設法編寫一個服務器,通過WebSocket監聽STOMP消息。我的問題在於stomp協議和JMS消費者的創建。ActiveMQ,WebSocket和Stomp
下面的代碼上的createConnection
class StompDemo {
val uri = "ws://localhost:61614"
val topicName = "mytopic"
val broker = new BrokerService
broker.addConnector(uri)
val topic = new ActiveMQTopic(topicName)
val topics = Array[ActiveMQDestination](topic)
broker.setDestinations(topics)
broker.start
println("Started broker")
val connectionFactory = new ActiveMQConnectionFactory(uri)
val connection = connectionFactory.createConnection
println("Started connection")
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val destination = session.createTopic(topicName)
val consumer = session.createConsumer(destination)
println("Created consumer")
while(true) {
println("Waiting for next message")
val message = consumer.receive
}
}
失敗,出現以下異常:
Could not create Transport. Reason: java.io.IOException: createTransport() method not implemented!
能否請你指出問題與此代碼? 如何使用AMQ通過WebSocket/Stomp以編程方式將JMS偵聽器配置爲隊列或主題?
感謝
新更新的代碼和ActiveMQ傳輸失敗:TCP:///127.0.0.1:51309 @ 6969】傳輸連接:TCP://127.0.0.1:51309失敗:java.io. IOException:未知數據類型:47 我想它必須與基於二進制和基於文本的相關。
仍在調查爲什麼會失敗:
package org.tj.amq
import org.apache.activemq.broker.BrokerService
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TextMessage
//
// http://www.massapi.com/class/br/BrokerService.html
//
object AMQStompDemo extends MainLoop with Logging {
<<("AMQ Stomp Demo")
val uri = "tcp://localhost:6969"
val broker = new BrokerService
broker.setPersistent(false)
broker.setUseJmx(false)
broker.addConnector(uri)
broker.start
<<("Started broker")
val connectionFactory = new ActiveMQConnectionFactory(uri)
val connection = connectionFactory.createConnection
connection.start
<<("Started connection")
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val destination = session.createQueue("test")
val consumer = session.createConsumer(destination)
while(true) {
<<("Ready to receive next message ...")
val message = consumer.receive
message match {
case tm:TextMessage => <<(s"Received text message ${tm.getText}")
case _ => <<(s"Received another message type $message")
}
}
def main(args: Array[String]): Unit = {}
}
trait Logging {
def <<(any : => Any) = println(s"${Thread.currentThread().getName} $any")
}
trait MainLoop extends Logging {
new Thread(new Runnable() {
override def run = {
<<("Starting main loop")
while(true) {
Thread.sleep(1000)
}
}
}).start
}
的傳奇仍在繼續。 只要加上broker.addConnector("ws://localhost:6971")
我可以成功地通過WS從瀏覽器連接到隊列/隊列/測試
現在,最後剩下的問題 - 我得到回調,但AMQ給了我這個
[WARN] 07 Feb 04:54:26 PM qtp1458849419-25 [] Transport Connection to: StompSocket_984548082 failed: java.io.IOException
Exception in thread "ActiveMQ InactivityMonitor Worker" java.lang.NullPointerException
at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:314)
at org.apache.activemq.transport.AbstractInactivityMonitor$4.run(AbstractInactivityMonitor.java:215)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
收到第一條消息之後。
將帖子 嗯,我一直在遭受https://issues.apache.org/jira/browse/AMQ-5155 因此,使用AMQ版本5.9.0工程。
我的感覺是AMQ for WebSockets太脆弱了。那麼可能用Tomcat代替更保守的方法。
謝謝。所以你要說的是有效地使用tcp://而不是ws://這有意義,並且只需使用web套接字lib通過正常的腳本連接,然後對它進行排序。正確? – jts 2015-02-06 11:30:42
是的,您需要使用tcp://或ssl://與Java客戶端。如果你需要從瀏覽器連接,然後使用基於stomp的庫,可以做websockets,確保代理有一個ws或wss傳輸來做到這一點。 – 2015-02-06 18:45:10
你好。我做了你的建議:基本上啓動了tcp:// localhost:6969的代理和連接,並使用了一個來自apache-activemq-5.11.0 \ webapps-demo \ demo \ websocket的web客戶端 - 並且當我連接到ws://localhost:6969發送消息到/ queue/test(我的代碼聽測試 - 瀏覽器告訴我哎呀!丟失連接到ws:// localhost:6969 /和服務器端tcp:///127.0.0.1: 51114 @ 6969 []傳輸連接到:tcp://127.0.0.1:51114失敗:java.io.IOException:未知數據類型:47 ..任何建議? – jts 2015-02-07 08:37:32