2012-06-07 49 views
1

在我的申請,我有ActiveMQ的從客戶端將消息發送到服務器和副versa.I運行它作爲當客戶機發送該消息的獨立server.So,該消息被傳遞在activeMq隊列中,然後由服務器(我的應用程序)檢索當且僅當事務在本地完成時,這意味着客戶機和服務器(我的應用程序)位於同一臺計算機中。但是,當我從一個客戶端和另一個客戶端中的兩個不同計算機意義服務器運行客戶端和服務器時,客戶端只能建立與服務器的連接,但消息不會傳遞到activeMq隊列。我認爲這是activeMq問題。使用Tomcat(JAVA)包的ActiveMQ

誰能告訴我如何解決這個問題? 感謝

這裏是經過客戶端發送到隊列中的數據的代碼。

package event.activeMq; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.Date; 
import java.util.Iterator; 

import javax.jms.Connection; 
import javax.jms.DeliveryMode; 
import javax.jms.Destination; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
import javax.jms.TextMessage; 

import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.console.command.store.amq.CommandLineSupport; 
import org.apache.activemq.util.IndentPrinter; 

public class ProducerTool extends Thread { 

    private Destination destination; 
    private int messageCount = 1; 
    private long sleepTime; 
    private boolean verbose = true; 
    private int messageSize = 1000; 
    private static int parallelThreads = 1; 
    private long timeToLive; 
    private String user = ActiveMQConnection.DEFAULT_USER; 
    private String password = ActiveMQConnection.DEFAULT_PASSWORD; 
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL; 
    private String subject = "CLOUDD.DEFAULT"; 
    private boolean topic; 
    private boolean transacted; 
    private boolean persistent; 
    private static Object lockResults = new Object(); 
    private static String DateTime=""; 
    private static String TaskID=""; 
    private static String UniqueEventID=""; 
    private static String Generator=""; 
    private static String GeneratorBuildVsn=""; 
    private static String Severity=""; 
    private static String EventText=""; 
    private static String SubsystemID=""; 
    private static String EventNumber=""; 
    private static String atmId=""; 


    public void element(String[] element) { 
     this.DateTime = element[0]; 
     this.TaskID = element[1]; 
     this.Generator = element[2]; 
     this.Severity = element[3]; 
     this.EventText = element[4]; 
     this.SubsystemID = element[5]; 
     this.EventNumber = element[6]; 
     this.GeneratorBuildVsn = element[7]; 
     this.UniqueEventID = element[8]; 
     this.atmId = element[9]; 
    } 
    public static void main(String[] args) { 
     System.out.println("came here"); 
     ArrayList<ProducerTool> threads = new ArrayList(); 
     ProducerTool producerTool = new ProducerTool(); 
     producerTool.element(args); 

     producerTool.showParameters(); 
     for (int threadCount = 1; threadCount <= parallelThreads; threadCount++) { 
      producerTool = new ProducerTool(); 
      CommandLineSupport.setOptions(producerTool, args); 
      producerTool.start(); 
      threads.add(producerTool); 
     } 

     while (true) { 
      Iterator<ProducerTool> itr = threads.iterator(); 
      int running = 0; 
      while (itr.hasNext()) { 
       ProducerTool thread = itr.next(); 
       if (thread.isAlive()) { 
        running++; 
       } 
      } 
      if (running <= 0) { 
       System.out.println("All threads completed their work"); 
       break; 
      } 
      try { 
       Thread.sleep(1000); 
      } catch (Exception e) { 
      } 
     } 
    } 

    public void showParameters() { 
     System.out.println("Connecting to URL: " + url); 
     System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject); 
     System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages"); 
     System.out.println("Sleeping between publish " + sleepTime + " ms"); 
     System.out.println("Running " + parallelThreads + " parallel threads"); 

     if (timeToLive != 0) { 
      // System.out.println("Messages time to live " + timeToLive + " ms"); 
     } 
    } 

    public void run() { 
     Connection connection = null; 
     try { 
      // Create the connection. 
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); 
      connection = connectionFactory.createConnection(); 
      connection.start(); 

      // Create the session 
      Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); 
      if (topic) { 
       destination = session.createTopic(subject); 
      } else { 
       destination = session.createQueue(subject); 
      } 

      // Create the producer. 
      MessageProducer producer = session.createProducer(destination); 
      if (persistent) { 
       producer.setDeliveryMode(DeliveryMode.PERSISTENT); 
      } else { 
       producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
      } 
      if (timeToLive != 0) { 
       producer.setTimeToLive(timeToLive); 
      } 

      // Start sending messages 
      sendLoop(session, producer); 

      // System.out.println("[" + this.getName() + "] Done."); 

      synchronized (lockResults) { 
       ActiveMQConnection c = (ActiveMQConnection) connection; 
       // System.out.println("[" + this.getName() + "] Results:\n"); 
       c.getConnectionStats().dump(new IndentPrinter()); 
      } 

     } catch (Exception e) { 
      // System.out.println("[" + this.getName() + "] Caught: " + e); 
      e.printStackTrace(); 
     } finally { 
      try { 
       connection.close(); 
      } catch (Throwable ignore) { 
      } 
     } 
    } 

    protected void sendLoop(Session session, MessageProducer producer) throws Exception { 

     for (int i = 0; i < messageCount || messageCount == 0; i++) { 

      TextMessage message = session.createTextMessage(createMessageText(i)); 
      if (verbose) { 
       String msg = message.getText(); 
       if (msg.length() > 50) { 
        msg = msg.substring(0, 50) + "..."; 
       } 
       // System.out.println("[" + this.getName() + "] Sending message: '" + msg + "'"); 
      } 

      producer.send(message); 

      if (transacted) { 
       // System.out.println("[" + this.getName() + "] Committing " + messageCount + " messages"); 
       session.commit(); 
      } 
      Thread.sleep(sleepTime); 
     } 
    } 

    private String createMessageText(int index) { 
     StringBuffer buffer = new StringBuffer(messageSize); 

     buffer.append("DateTime "+DateTime+" EventNumber "+EventNumber+" TaskID "+TaskID+" AtmId "+atmId+ 
       " Generator "+Generator+" GeneratorBuildVsn "+GeneratorBuildVsn+" Severity "+Severity+ 
       " UniqueEventID "+UniqueEventID+" EventText "+EventText+" SubsystemID "+SubsystemID+" End "); 
     if (buffer.length() > messageSize) { 
      return buffer.substring(0, messageSize); 
     } 
     for (int i = buffer.length(); i < messageSize; i++) { 
      buffer.append(' '); 
     } 

     DateTime=""; 
     EventNumber=""; 
     TaskID=""; 
     atmId=""; 
     Generator=""; 
     GeneratorBuildVsn=""; 
     Severity=""; 
     UniqueEventID=""; 
     EventText=""; 
     SubsystemID=""; 

     return buffer.toString();   
    } 

    public void setPersistent(boolean durable) { 
     this.persistent = durable; 
    } 

    public void setMessageCount(int messageCount) { 
     this.messageCount = messageCount; 
    } 

    public void setMessageSize(int messageSize) { 
     this.messageSize = messageSize; 
    } 

    public void setPassword(String pwd) { 
     this.password = pwd; 
    } 

    public void setSleepTime(long sleepTime) { 
     this.sleepTime = sleepTime; 
    } 

    public void setSubject(String subject) { 
     this.subject = subject; 
    } 

    public void setTimeToLive(long timeToLive) { 
     this.timeToLive = timeToLive; 
    } 

    public void setParallelThreads(int parallelThreads) { 
     if (parallelThreads < 1) { 
      parallelThreads = 1; 
     } 
     this.parallelThreads = parallelThreads; 
    } 

    public void setTopic(boolean topic) { 
     this.topic = topic; 
    } 

    public void setQueue(boolean queue) { 
     this.topic = !queue; 
    } 

    public void setTransacted(boolean transacted) { 
     this.transacted = transacted; 
    } 

    public void setUrl(String url) { 
     this.url = url; 
    } 

    public void setUser(String user) { 
     this.user = user; 
    } 

    public void setVerbose(boolean verbose) { 
     this.verbose = verbose; 
    } 
} 

回答

0

你需要更新這個問題的答案你questin:

  • 你用什麼操作系統?
  • 你有沒有像PC上的軟件防火牆?
  • 你可以在這裏提供ActiveMQ conf文件嗎?
  • 您能否提供一個實現連接建立的功能 ?

UPD: 我do't瞭解你所有的邏輯,但我在這裏THK錯誤:

try { 
    Thread.sleep(1000); 
    } catch (Exception e){  
    e.printStackTrace(); 
    } 

而且永遠永遠永遠捕獲所有異常!這非常危險。 如果你想捕捉異常,你需要處理它。

+0

我使用Windows 7,是的,我有一個防火牆。 – user1254261

+0

服務器和客戶端之間的連接是由tcp/ip套接字 – user1254261