2012-12-30 71 views
2

您好我對hornetQ比較陌生。我已經編寫了一個測試包,其中包含一個嵌入式hornet Q服務器,包含一個生產者和一個異步使用者。希望我已經完成了實施的權利。現在的問題如下HornetQ:核心API生產者和異步消費者問題

  1. 當我通過它成功返回的prodcuer發送消息(S)到隊列中,但試圖消耗隊列中的消息沒有消息(S)似乎被消耗時。這就像消費者不活躍。

  2. 當我嘗試使用setSendAcknowledgementHandler方法我得到的錯誤

java.lang.IllegalStateException:不能設置confirmationHandler與確認窗口大小< 0看那一個連接上有關更多信息的文檔。 在org.hornetq.core.protocol.core.impl.ChannelImpl.setCommandConfirmationHandler(ChannelImpl.java:330) 在org.hornetq.core.client.impl.ClientSessionImpl.setSendAcknowledgementHandler(ClientSessionImpl.java:943) 的組織。 hornetq.core.client.impl.DelegatingSession.setSendAcknowledgementHandler(DelegatingSession.java:493) 在componentsII.QueueProducer.SendMessage(QueueProducer.java:9) 在componentsII.StartClients.main(StartClients.java:11)​​

檢查以下課程

連接和會話

public class QueueConnection { 

private static org.hornetq.api.core.TransportConfiguration transport; 
private static org.hornetq.api.core.client.ClientSessionFactory sharedfactory; 
public static org.hornetq.api.core.client.ClientSession sharedSession; 
private static java.util.HashMap<String,Object> maps; 
private static boolean started= false; 


public static void setMaps(String []key, Object[] value){ 
    maps= new java.util.HashMap<String, Object>() ; 

    if(key.length!=value.length){ 
     maps=null; 
    }else{ 
     for(int x=0;x<value.length;x++){ 
      maps.put(key[x], value[x]); 
     } 
    } 

} 

private static org.hornetq.api.core.client.ClientSession session(){ 
    try{ 
    if(sharedSession==null){ 
     sharedSession=factory().createSession(true,true,0); 
    } 
    }catch(org.hornetq.api.core.HornetQException e){ 
     e.printStackTrace(); 
    }catch(Exception e){ 
     e.printStackTrace(); 
    } 
    return sharedSession; 
} 

public static synchronized int size(String queueName){ 
    int count = 0; 
    try { 
     org.hornetq.api.core.client.ClientSession.QueueQuery result; 
     result = sharedSession.queueQuery(new org.hornetq.api.core.SimpleString(queueName)); 
     count = (int) result.getMessageCount(); 
    } catch (org.hornetq.api.core.HornetQException e) { 
     e.printStackTrace(); 
    } 
    return count; 
} 
public static void startSession(){ 
    try{ 

     if (session() != null || started!=true){ 
      session().start(); 
      started=true; 
      System.out.println("Client Session started"); 
     }else{ 
      System.out.println("Client Session already started"); 
     } 
    }catch(org.hornetq.api.core.HornetQException e){ 
     e.printStackTrace(); 
    }catch(Exception e){ 
     e.printStackTrace(); 
    } 
} 

public static void stopSession(){ 
    try{ 
     if (session() != null){ 
     session().stop(); 
     factory().close(); 
     sharedSession=null; 
     sharedfactory=null; 
     System.out.println("Client Session stopped"); 
     } 
    }catch(org.hornetq.api.core.HornetQException e){ 
     e.printStackTrace(); 
    }catch(Exception e){ 
     e.printStackTrace(); 
    } 
} 

private static org.hornetq.api.core.TransportConfiguration TransportConfigs(){ 
    transport=new org.hornetq.api.core.TransportConfiguration(org.hornetq.core.remoting.impl.netty.NettyConnectorFactory.class.getName(),maps); 
    return transport; 
} 

private static org.hornetq.api.core.client.ClientSessionFactory factory(){ 
    try{ 
     if(sharedfactory==null){ 
      org.hornetq.api.core.client.ServerLocator locator=org.hornetq.api.core.client.HornetQClient.createServerLocator(true,TransportConfigs()); 
      locator.setAckBatchSize(0); 
      locator.setReconnectAttempts(3); 
      locator.setConfirmationWindowSize(2); 
      sharedfactory=locator.createSessionFactory(); 
     } 
    }catch(Exception e){ 
     e.printStackTrace(); 
    } 
    return sharedfactory; 
} 

}

消費者

public class QueueConsumer { 


public static void Recieve(String queuename){ 
    try { 
     org.hornetq.api.core.client.ClientConsumer consumer = QueueConnection.sharedSession.createConsumer(queuename); 
      consumer.setMessageHandler(new msghandler()); 
    } catch (org.hornetq.api.core.HornetQException e) { 
     e.printStackTrace(); 
    } catch(Exception e){ 
     e.printStackTrace(); 
    } 
} 

private static class msghandler implements org.hornetq.api.core.client.MessageHandler { 
    @Override 
    public void onMessage(org.hornetq.api.core.client.ClientMessage msg) { 
     System.out.println("Message consumed ~"+msg.getStringProperty("myMsg")); 

    } 
} 

}

生產者

public class QueueProducer { 

public static void SendMessage(String queuename,String msg){ 
    try{ 
     QueueConnection.sharedSession.setSendAcknowledgementHandler(new acknowledegeHandler()); 
     org.hornetq.api.core.client.ClientProducer producer= QueueConnection.sharedSession.createProducer(queuename); 
     org.hornetq.api.core.client.ClientMessage message = QueueConnection.sharedSession.createMessage(org.hornetq.api.core.client.ClientMessage.TEXT_TYPE,true); 
     message.putStringProperty("myMsg", msg); 
     producer.send(queuename,message); 
     System.out.println("Message Sent to "+queuename); 

    }catch(org.hornetq.api.core.HornetQException e){ 
     e.printStackTrace(); 
    }catch(Exception e){ 
     e.printStackTrace(); 
    } 
} 

private static class acknowledegeHandler implements org.hornetq.api.core.client.SendAcknowledgementHandler{ 
    @Override 
    public void sendAcknowledged(org.hornetq.api.core.Message msg) { 
     System.out.println("Received acknowledgement for message ~: "+msg.getStringProperty("myMsg")); 
    } 
} 

}

初始化客戶端

public class StartClients { 

private static void initialize(){ 
    String keys[]={org.hornetq.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME,org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME}; 
    Object values[]={"localhost",5664}; 

    QueueConnection.setMaps(keys,values); 
    QueueConnection.startSession(); 
} 
public static void main(String []args){ 

    new Thread(){@Override 
     public void run(){ 
      System.out.println("Producer Thread"); 
      StartClients.initialize(); 
      String QueueName= "queues.Queue1"; 

      for(int a=1;a<10;a++){ 
       QueueProducer.SendMessage(QueueName, "Message "+a+" to the embedded HornetQ Server"); 
      } 
      System.out.println("Queue "+QueueName+" has "+QueueConnection.size(QueueName)+" messages on send"); 
      QueueConnection.stopSession(); 
     }}.start(); 

    new Thread(){@Override 
     public void run(){ 
      try{ 
       sleep(5000); 
       System.out.println(); 
       System.out.println("Consumer Thread"); 
       StartClients.initialize(); 
       String QueueName= "queues.Queue1"; 
       QueueConsumer.Recieve(QueueName); 
       System.out.println("Queue "+QueueName+" has "+QueueConnection.size(QueueName)+" messages after consumption"); 
       QueueConnection.stopSession(); 
      }catch(java.lang.InterruptedException e){ 
       e.printStackTrace(); 
      }catch(Exception e){ 
       e.printStackTrace(); 
      } 
     }}.start(); 

} 

}

嵌入式服務器

import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.HashSet; 
import java.util.List; 
import java.util.Map; 
import java.util.logging.Level; 
import java.util.logging.Logger; 
import org.hornetq.api.core.TransportConfiguration; 
import org.hornetq.core.config.Configuration; 
import org.hornetq.core.config.CoreQueueConfiguration; 
import org.hornetq.core.config.impl.ConfigurationImpl; 
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory; 
import org.hornetq.core.remoting.impl.netty.TransportConstants; 
import org.hornetq.core.server.JournalType; 
import org.hornetq.core.server.embedded.EmbeddedHornetQ; 
public class QueueServer { 
public static void StartServer(){ 

    try { 

     //Connection configurations 
     Map<String, Object> params = new HashMap<String, Object>(); 
     params.put(TransportConstants.HOST_PROP_NAME, "localhost"); 
     params.put(TransportConstants.PORT_PROP_NAME, 5664); 
     params.put(TransportConstants.USE_NIO_PROP_NAME, true); 
     params.put(TransportConstants.TCP_NODELAY_PROPNAME, true); 
     params.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3); 

     Map<String, Object> params2 = new HashMap<String, Object>(); 
     params2.put(TransportConstants.HOST_PROP_NAME, "localhost"); 
     params2.put(TransportConstants.PORT_PROP_NAME, 5665); 
     params2.put(TransportConstants.USE_NIO_PROP_NAME, true); 
     params2.put(TransportConstants.TCP_NODELAY_PROPNAME, true); 
     params2.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3); 

     Map<String, Object> params3 = new HashMap<String, Object>(); 
     params3.put(TransportConstants.HOST_PROP_NAME, "localhost"); 
     params3.put(TransportConstants.PORT_PROP_NAME, 5666); 
     params3.put(TransportConstants.USE_NIO_PROP_NAME, true); 
     params3.put(TransportConstants.TCP_NODELAY_PROPNAME, true); 
     params3.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3); 

     Map<String, Object> params4 = new HashMap<String, Object>(); 
     params4.put(TransportConstants.HOST_PROP_NAME, "localhost"); 
     params4.put(TransportConstants.PORT_PROP_NAME, 5667); 
     params4.put(TransportConstants.USE_NIO_PROP_NAME, true); 
     params4.put(TransportConstants.TCP_NODELAY_PROPNAME, true); 
     params4.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3); 

     //Server configurations 
     Configuration config= new ConfigurationImpl(); 
     HashSet<TransportConfiguration>transports= new HashSet <TransportConfiguration>(); 
     transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params)); 
     transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params2)); 
     transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params3)); 
     transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params4)); 


     //Queues Configurations 
     List<CoreQueueConfiguration> queueConfigs = new ArrayList<CoreQueueConfiguration>(); 
     String queueName="queues.Queue"; 
     for(int x=1;x<5;x++){ 
      queueConfigs.add(new CoreQueueConfiguration(queueName.concat(String.valueOf(x)),queueName.concat(String.valueOf(x)), null, true)); 
     } 

     //Set Configurations 
     config.setAcceptorConfigurations(transports); 
     config.setQueueConfigurations(queueConfigs); 
     config.setJournalType(JournalType.NIO); 
     config.setSecurityEnabled(false); 

     //Starting server 
     EmbeddedHornetQ embedded = new EmbeddedHornetQ(); 
     embedded.setConfiguration(config); 
     embedded.start(); 
     Thread.sleep(6000000); 
     embedded.stop(); 

    } catch (Exception ex) { 
     Logger.getLogger(QueueServer.class.getName()).log(Level.SEVERE, null, ex); 
    } 

} 
public static void main(String [] args){ 
    StartServer();  
} 

}

在哪裏可以我是想錯了?

回答