我已經編寫了一個生產者應用程序,它通過使用activeMQ中的Executer服務來排隊JMS消息,並且它工作的很好,但問題是需要很長時間來排隊消息。activeMQ生產者花費很長時間發送郵件
有三個文件: 1. ExecutePushServer.java 2. ActiveMQProducer.java 3. SendPush.java
ExecutePushServer.java:
package com.rh.pushserver;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
public class ExecutePushServer {
/**
* @uthor ankit
*/
static int maxThread = 0;
static BufferedReader br = null;
static String fileLocation = null;
static List<String> tokenList = new ArrayList<String>();
private static String txt;
static Properties configFile = new Properties();
private final static Logger logger = Logger
.getLogger(ExecutePushServer.class.getName());
public static void main(String[] args) {
// TODO Auto-generated method stub
try {
configFile.load(ExecutePushServer.class.getClassLoader()
.getResourceAsStream("config.properties"));
maxThread = Integer.valueOf(configFile.getProperty("POOL_SIZE"));
fileLocation = configFile.getProperty("LOCATION");
txt = configFile.getProperty("txt");
logger.info("Message text is : " + txt);
br = new BufferedReader(new FileReader(fileLocation));
ActiveMQProducer mqProducer = new ActiveMQProducer();
tokenList = getList(br);
logger.info("tokenList created.");
ExecutorService executor = Executors.newFixedThreadPool(maxThread);
for (String id : tokenList) {
Runnable work = new SendPush(mqProducer, id);
executor.execute(work);
}
// This will make the executor accept no new threads
// and finish all existing threads in the queue
logger.info("All Ids Entered in Pool.");
executor.shutdown();
while (!executor.awaitTermination(10, TimeUnit.MINUTES)) {
logger.info("Inside awaitTermination");
}
mqProducer.closeConnection();
} catch (IOException e) {
logger.error("Error in Reading File" + e);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
logger.error("Error in termination of executer" + e);
} finally {
try {
if (br != null)
br.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
private static List<String> getList(BufferedReader br) {
// TODO Auto-generated method stub
String currentLine;
try {
while ((currentLine = br.readLine()) != null) {
tokenList.add(currentLine);
}
return tokenList;
} catch (IOException e) {
logger.error("Error occured in creating tokenList !" + e);
return null;
}
}
}
ActiveMQProducer.java
package com.rh.pushserver;
import java.io.IOException;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
public class ActiveMQProducer {
/**
* @uthor ankit
*/
private final String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private final String subject = "PUSH_NOTIFICATION";
private Connection connection;
private Session session;
private String txt=null;
private MessageProducer producer;
private MapMessage mapMessage;
static Properties configFile = new Properties();
private final static Logger logger=Logger.getLogger(ActiveMQProducer.class.getName());
public ActiveMQProducer() {
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
logger.info("Connection Created.");
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(subject);
producer = session.createProducer(destination);
logger.info("Producer generated");
configFile.load(ActiveMQProducer.class.getClassLoader().getResourceAsStream("config.properties"));
txt=configFile.getProperty("txt");
mapMessage = session.createMapMessage();
} catch (JMSException e) {
// TODO Auto-generated catch block
logger.error("Error JMS Exception occured in creating connection"+e);
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error("Exception occured while opening file "+e);
}
}
public MessageProducer getProducer() {
return producer;
}
public void enqueueMessage(String id){
try {
mapMessage.setString("ID", id);
mapMessage.setString("DISPLAY_STRING", txt);
mapMessage.setInt("BADGE_COUNT", 1);
mapMessage.setString("DEVICE_TYPE", "ANDROID");
producer.send(mapMessage);
logger.info("Sent on : "+id);
} catch (JMSException e) {
// TODO Auto-generated catch block
logger.error("Error while Enqueue"+e);
}
}
public void closeConnection(){
try {
connection.close();
logger.info("Connection closed");
} catch (JMSException e) {
// TODO Auto-generated catch block
logger.error("Error in connection closer"+e);
}
}
}
SendPush.java
package com.rh.pushserver;
public class SendPush implements Runnable {
/**
* @uthor ankit
*/
private String id;
private ActiveMQProducer mqProducer;
public SendPush(ActiveMQProducer mqProducer,String id) {
this.id=id;
this.mqProducer=mqProducer;
}
@Override
public void run() {
mqProducer.enqueueMessage(id);
}
}
請幫助我!
每個消息都需要很長時間才能入隊嗎?第一條信息的長度只有百分之一還是千分之一? – Tim 2014-10-06 18:17:43
無論如何,你發送了多少條消息?看起來你每條消息使用一個線程,這聽起來像比我想要的更多的開銷,但如果你只發送一些消息,那麼也許這不是一個問題。 – Tim 2014-10-06 18:21:34
我一次發送大約50萬條消息,實際上我需要停止兩個多小時的流程。 – 2014-10-07 05:48:36