2014-10-06 24 views
0

我已經編寫了一個生產者應用程序,它通過使用activeMQ中的Executer服務來排隊JMS消息,並且它工作的很好,但問題是需要很長時間來排隊消息。activeMQ生產者花費很長時間發送郵件

有三個文件: 1. ExecutePushServer.java 2. ActiveMQProducer.java 3. SendPush.java

ExecutePushServer.java:

package com.rh.pushserver; 

import java.io.BufferedReader; 
import java.io.FileReader; 
import java.io.IOException; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.Properties; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

import org.apache.log4j.Logger; 

public class ExecutePushServer { 

/** 
* @uthor ankit 
*/ 

static int maxThread = 0; 
static BufferedReader br = null; 
static String fileLocation = null; 
static List<String> tokenList = new ArrayList<String>(); 
private static String txt; 
static Properties configFile = new Properties(); 
private final static Logger logger = Logger 
     .getLogger(ExecutePushServer.class.getName()); 

public static void main(String[] args) { 
    // TODO Auto-generated method stub 
    try { 
     configFile.load(ExecutePushServer.class.getClassLoader() 
       .getResourceAsStream("config.properties")); 
     maxThread = Integer.valueOf(configFile.getProperty("POOL_SIZE")); 

     fileLocation = configFile.getProperty("LOCATION"); 

     txt = configFile.getProperty("txt"); 
     logger.info("Message text is : " + txt); 

     br = new BufferedReader(new FileReader(fileLocation)); 

     ActiveMQProducer mqProducer = new ActiveMQProducer(); 

     tokenList = getList(br); 
     logger.info("tokenList created."); 


     ExecutorService executor = Executors.newFixedThreadPool(maxThread); 
     for (String id : tokenList) { 
      Runnable work = new SendPush(mqProducer, id); 
      executor.execute(work); 
     } 

     // This will make the executor accept no new threads 
     // and finish all existing threads in the queue 
     logger.info("All Ids Entered in Pool."); 
     executor.shutdown(); 

     while (!executor.awaitTermination(10, TimeUnit.MINUTES)) { 
      logger.info("Inside awaitTermination"); 
     } 

     mqProducer.closeConnection(); 

    } catch (IOException e) { 
     logger.error("Error in Reading File" + e); 

    } catch (InterruptedException e) { 
     // TODO Auto-generated catch block 
     logger.error("Error in termination of executer" + e); 
    } finally { 
     try { 
      if (br != null) 
       br.close(); 

     } catch (IOException ex) { 
      ex.printStackTrace(); 
     } 
    } 

} 

private static List<String> getList(BufferedReader br) { 
    // TODO Auto-generated method stub 
    String currentLine; 
    try { 
    while ((currentLine = br.readLine()) != null) { 
     tokenList.add(currentLine); 
    } 

    return tokenList; 

    } catch (IOException e) { 
     logger.error("Error occured in creating tokenList !" + e); 
     return null; 
    } 
} 

} 

ActiveMQProducer.java

package com.rh.pushserver; 


import java.io.IOException; 
import java.util.Properties; 

import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.MapMessage; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 

import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.log4j.Logger; 

public class ActiveMQProducer { 

/** 
* @uthor ankit 
*/ 


private final String url = ActiveMQConnection.DEFAULT_BROKER_URL; 
private final String subject = "PUSH_NOTIFICATION"; 
private Connection connection; 
private Session session; 
private String txt=null; 
private MessageProducer producer; 
private MapMessage mapMessage; 
static Properties configFile = new Properties(); 
private final static Logger logger=Logger.getLogger(ActiveMQProducer.class.getName()); 

public ActiveMQProducer() { 
    try { 
     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); 
     connection = connectionFactory.createConnection(); 
     connection.start(); 

     logger.info("Connection Created."); 

     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     Destination destination = session.createQueue(subject); 
     producer = session.createProducer(destination); 

     logger.info("Producer generated"); 

       configFile.load(ActiveMQProducer.class.getClassLoader().getResourceAsStream("config.properties")); 

     txt=configFile.getProperty("txt"); 

     mapMessage = session.createMapMessage(); 
    } catch (JMSException e) { 
     // TODO Auto-generated catch block 
     logger.error("Error JMS Exception occured in creating connection"+e); 
    } catch (IOException e) { 
     // TODO Auto-generated catch block 
     logger.error("Exception occured while opening file "+e); 
    } 
} 
public MessageProducer getProducer() { 
    return producer; 
} 

public void enqueueMessage(String id){ 
    try { 
     mapMessage.setString("ID", id); 
     mapMessage.setString("DISPLAY_STRING", txt); 
     mapMessage.setInt("BADGE_COUNT", 1); 
     mapMessage.setString("DEVICE_TYPE", "ANDROID"); 

     producer.send(mapMessage); 
     logger.info("Sent on : "+id); 

    } catch (JMSException e) { 
     // TODO Auto-generated catch block 
     logger.error("Error while Enqueue"+e); 
    } 
} 

public void closeConnection(){ 
    try { 
     connection.close(); 
     logger.info("Connection closed"); 
    } catch (JMSException e) { 
     // TODO Auto-generated catch block 
     logger.error("Error in connection closer"+e); 
    } 
} 

}

SendPush.java

package com.rh.pushserver; 



public class SendPush implements Runnable { 
/** 
* @uthor ankit 
*/ 

private String id; 
private ActiveMQProducer mqProducer; 


public SendPush(ActiveMQProducer mqProducer,String id) { 

    this.id=id; 
    this.mqProducer=mqProducer; 
} 

@Override 
public void run() { 

    mqProducer.enqueueMessage(id); 
} 

} 

請幫助我!

+0

每個消息都需要很長時間才能入隊嗎?第一條信息的長度只有百分之一還是千分之一? – Tim 2014-10-06 18:17:43

+0

無論如何,你發送了多少條消息?看起來你每條消息使用一個線程,這聽起來像比我想要的更多的開銷,但如果你只發送一些消息,那麼也許這不是一個問題。 – Tim 2014-10-06 18:21:34

+0

我一次發送大約50萬條消息,實際上我需要停止兩個多小時的流程。 – 2014-10-07 05:48:36

回答

0

我首先看的是你的線程使用情況;你正在爲每封郵件創建一個新線程,這絕對是你表現不快的重要組成部分。爲什麼你不能讓你的線程運行一個循環來發送消息,直到它們用完消息發送,然後有N個線程來分割工作?

你也可能想要在一個單獨的線程中運行你的閱讀器邏輯,在這個線程中它儘可能快地讀取文件,並把它讀取到線程的東西傳遞給它,所以你不必等待該文件被讀取甚至開始。確保您使用數據結構來在讀取器線程之間傳遞數據,並且消息線程是線程安全的!

一旦你這樣做,如果速度不是你想要的地方,看看代理的配置。 (如果你想讓別人看看它,請在這裏發佈。)特別是,如果你的消息是持久的,那麼看看它們在哪裏被持久化,看看是否還有其他的選擇會更快。 (JDBC存儲通常是最慢的持久性選項,因此請考慮其他選項。)或者,您甚至可以使消息不持久以使其更快;你必須決定你是否可以接受這種折衷。弄清楚你的消息是異步傳遞還是同步傳遞;如果同步,則可能需要啓用異步。並確保生產者流量控制不在踢(檢查經紀人日誌);如果是這樣,那麼你的消費者可能太慢,並放慢你的生產者。