2012-06-21 63 views
0

我無法通過kaazing javascript AMPQ庫發送發佈消息。下面的代碼完全適用於FANOUT交換,但它不適用於DIRECT交換。我能夠連接但不能將消息發佈到DIRECT交換。通過kaazing javascript庫使用rabbitMQ DIRECT交換時出錯

function amqpTest(){ 
load(); 
var $ = function(s) { return document.getElementById(s); }; 
var console = $("console"); 

var log = function(message){ 
var pre = document.createElement("pre"); 
pre.style.wordWrap = "break-word"; 
pre.innerHTML = message; 
console.insertBefore(pre, console.firstChild); 
while(console.childNodes.length > 500){ 
console.removeChild(console.lastChild); 
} 
} 

var url = "ws://localhost:8000/amqp"; 
var username = "guest"; 
var password = "guest"; 
var connect = $("connect"); 
var disconnect= $("disconnect"); 

var consumeExchange = "demo_direct_exchange"; 
var consumeMessageText = "call"; 
var alias = "player"; 

var myQueueName = "clientqueue"; 
var myConsumerTag = "clientkey"; 
var routingKey = "broadcastkey"; 
var exchangeName = consumeExchange; 
var send = $("send"); 

connect.onclick = function(){ 
log("Connecting: "+ url + " " + username); 
myQueueName = "client" + Math.floor(Math.random() * 1000000); 
myConsumerTag = "client" + Math.floor(Math.random() * 1000000); 

var version = "0-9-0"; 
amqp = new AmqpClient(); 
amqp.addEventListener("close", function(){ 
log("Disconnected"); 
}); 

amqp.connect(url, '/', {username:username, password:password}, version, openHandler); 
} 

var openHandler = function(){ 
log("CONNECTED"); 
log("Open Publish Channel..."); 
publishChannel = amqp.openChannel(publishChannelOpenHandler); 
log("Open Consume Channel..."); 
consumeChannel = amqp.openChannel(consumeChannelOpenHandler); 
}; 

var publishChannelOpenHandler = function(channel) { 
log("Opened Publish Channel"); 
publishChannel.declareExchange(exchangeName, "direct", false, false, false); 
publishChannel.addEventListener("declareexchange", function(){log("Exchange Declared : "+exchangeName)}); 
publishChannel.addEventListener("close", function() { log("Channel Closed : Publish Channel")}); 

}; 

var consumeChannelOpenHandler = function(channel) { 
log("Opened Consumed Channel"); 
consumeChannel.addEventListener("declarequeue", function(){ log("Queue Declared : " +myQueueName); }); 
consumeChannel.addEventListener("bindqueue", function() { log("QUEUE BOUND: " + exchangeName + " " + myQueueName)}); 
consumeChannel.addEventListener("subscribe", function() { log("CONSUME: " + myQueueName)}); 
consumeChannel.addEventListener("close", function() {log("CHANNEL CLOSED: consume channel");}); 
     consumeChannel.addEventListener("message", messageHandler); 

consumeChannel.declareQueue(myQueueName, false, false, false, false, false) 
.bindQueue(myQueueName, exchangeName, routingKey, false) 
.consumeBasic(myQueueName, myConsumerTag, false, true, true, false); 
} 

var messageHandler = function(m){ 
alert("A"); 
var body = m.body.getString(Charset.UTF8); 


if (body == "start_auction"){ 
ss(); 
} 
if (body == "new_bidder"){ 
alert("New Bidder"); 
} 

//log(body); 
}; 

disconnect.onclick = function() { 
log("DISCONNECT"); 
amqp.disconnect(); 
}; 

send.onclick = function(){ 
log ("SENDING MESSAGE ...."); 
var body = new ByteBuffer(); 
body.putString("new_bidder", Charset.UTF8); 
body.flip(); 
var headers = {}; 
publishChannel.publishBasic(body, headers, consumeExchange, routingKey, false, false); 

}; 

$("clear").onclick = function() { while (console.childNodes.length > 0) 
{ 
console.removeChild(console.lastChild); 
} 
}; 

} 

任何幫助將不勝感激。

回答

0

明白了......必須刪除declareExchange的代碼,因爲exchange已經存在。然而,奇怪的錯誤是隻爲直接交換不扇出.. :)