0
我無法通過kaazing javascript AMPQ庫發送發佈消息。下面的代碼完全適用於FANOUT交換,但它不適用於DIRECT交換。我能夠連接但不能將消息發佈到DIRECT交換。通過kaazing javascript庫使用rabbitMQ DIRECT交換時出錯
function amqpTest(){
load();
var $ = function(s) { return document.getElementById(s); };
var console = $("console");
var log = function(message){
var pre = document.createElement("pre");
pre.style.wordWrap = "break-word";
pre.innerHTML = message;
console.insertBefore(pre, console.firstChild);
while(console.childNodes.length > 500){
console.removeChild(console.lastChild);
}
}
var url = "ws://localhost:8000/amqp";
var username = "guest";
var password = "guest";
var connect = $("connect");
var disconnect= $("disconnect");
var consumeExchange = "demo_direct_exchange";
var consumeMessageText = "call";
var alias = "player";
var myQueueName = "clientqueue";
var myConsumerTag = "clientkey";
var routingKey = "broadcastkey";
var exchangeName = consumeExchange;
var send = $("send");
connect.onclick = function(){
log("Connecting: "+ url + " " + username);
myQueueName = "client" + Math.floor(Math.random() * 1000000);
myConsumerTag = "client" + Math.floor(Math.random() * 1000000);
var version = "0-9-0";
amqp = new AmqpClient();
amqp.addEventListener("close", function(){
log("Disconnected");
});
amqp.connect(url, '/', {username:username, password:password}, version, openHandler);
}
var openHandler = function(){
log("CONNECTED");
log("Open Publish Channel...");
publishChannel = amqp.openChannel(publishChannelOpenHandler);
log("Open Consume Channel...");
consumeChannel = amqp.openChannel(consumeChannelOpenHandler);
};
var publishChannelOpenHandler = function(channel) {
log("Opened Publish Channel");
publishChannel.declareExchange(exchangeName, "direct", false, false, false);
publishChannel.addEventListener("declareexchange", function(){log("Exchange Declared : "+exchangeName)});
publishChannel.addEventListener("close", function() { log("Channel Closed : Publish Channel")});
};
var consumeChannelOpenHandler = function(channel) {
log("Opened Consumed Channel");
consumeChannel.addEventListener("declarequeue", function(){ log("Queue Declared : " +myQueueName); });
consumeChannel.addEventListener("bindqueue", function() { log("QUEUE BOUND: " + exchangeName + " " + myQueueName)});
consumeChannel.addEventListener("subscribe", function() { log("CONSUME: " + myQueueName)});
consumeChannel.addEventListener("close", function() {log("CHANNEL CLOSED: consume channel");});
consumeChannel.addEventListener("message", messageHandler);
consumeChannel.declareQueue(myQueueName, false, false, false, false, false)
.bindQueue(myQueueName, exchangeName, routingKey, false)
.consumeBasic(myQueueName, myConsumerTag, false, true, true, false);
}
var messageHandler = function(m){
alert("A");
var body = m.body.getString(Charset.UTF8);
if (body == "start_auction"){
ss();
}
if (body == "new_bidder"){
alert("New Bidder");
}
//log(body);
};
disconnect.onclick = function() {
log("DISCONNECT");
amqp.disconnect();
};
send.onclick = function(){
log ("SENDING MESSAGE ....");
var body = new ByteBuffer();
body.putString("new_bidder", Charset.UTF8);
body.flip();
var headers = {};
publishChannel.publishBasic(body, headers, consumeExchange, routingKey, false, false);
};
$("clear").onclick = function() { while (console.childNodes.length > 0)
{
console.removeChild(console.lastChild);
}
};
}
任何幫助將不勝感激。