1
我真的不知道瓶頸在哪裏。如何增加activemq + tomee的同步消費者數量?
我從零開始創建了一個新項目來嘗試隔離變量。
現在我必須爲
製作簡單的東西:
public void createTasks() throws JMSException {
for(int i=0;i<1000;i++){
if (i%100 == 0){
System.out.println(i);
}
MyEntity my = new MyEntity("xyz"+i);
my = this.baseService.getMyEntityDAO().addAndFlush(my);
jmsService.enqueue(my);
}
}
JMS
@Singleton
@Lock(LockType.WRITE)
public class JmsService implements Serializable{
public void enqueue(MyEntity target) throws JMSException{
Connection connection = null;
Session session = null;
try {
ConnectionFactory connectionFactory = this.connectionFactory;
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = this.driverJobQueue;
MessageProducer producer = session.createProducer(queue);
Message message = session.createMessage();
message.setLongProperty("id",target.getId());
message.setStringProperty("type", "xyz");
producer.send(message,DeliveryMode.PERSISTENT,1,0);
} finally {
// Clean up
if (session != null){
session.close();
}
if (connection != null){
connection.close();
}
}
}
消費者
import javax.annotation.PostConstruct;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
import javax.ejb.MessageDriven;
import javax.inject.Inject;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
@MessageDriven(activationConfig = {
@ActivationConfigProperty(
propertyName = "destinationType",
propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(
propertyName = "destination",
propertyValue = "DriverJobQueue"),
@ActivationConfigProperty(
propertyName = "messageSelector",
propertyValue = "type = 'xyz'"),
@ActivationConfigProperty(
propertyName = "acknowledgeMode",
propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(
propertyName = "maxSessions",
propertyValue = "30"),
@ActivationConfigProperty(
propertyName = "maxMessagesPerBatch",
propertyValue = "30"),
@ActivationConfigProperty(
propertyName = "maxMessagesPerSessions",
propertyValue = "30") })
public class MyWorker implements MessageListener {
@Override
public void onMessage(Message msg) {
MyEntity entity = null;
Long id = null;
try {
id = msg.getLongProperty("id");
if (entity != null) {
System.out.println(id);
}else{
System.out.println("ops");
}
System.out.println(this+" is processing "+entity);
} catch (Throwable e) {
e.printStackTrace();
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this+" finished "+entity);
}
}
tomee.xml
<Resource id="Default JMS Resource Adapter" type="ActiveMQResourceAdapter">
BrokerXmlConfig = xbean:file:/pathto/activemq.xml
ServerUrl = tcp://0.0.0.0:61616
threadPoolSize 30 <<<<<<< does that really works?
</Resource>
<Container id="MyJmsMdbContainer" ctype="MESSAGE">
ResourceAdapter = Default JMS Resource Adapter
</Container>
<Resource id="Fooo" type="javax.jms.ConnectionFactory">
PoolMaxSize 30
</Resource>
<Container id="myAllContainer" type="STATELESS">
PoolSize 30
StrictPooling true
</Container>
<Container id="msg" type="MESSAGE">
InstanceLimit 30
</Container>
<Container id="Foo" type="BMP_ENTITY">
PoolSize 30
</Container>
activemq.xml中
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000" />
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#oracle-ds" />
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="128 mb" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb" />
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb" />
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="tcp" uri="tcp://0.0.0.0:61616" />
</transportConnectors>
</broker>
<bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="oracle.jdbc.OracleDriver" />
<property name="url" value="jdbc:oracle:thin:@localhost:1521:XE" />
<property name="username" value="xxx" />
<property name="password" value="xxx" />
<property name="poolPreparedStatements" value="true" />
<property name="maxActive " value="30" />
</bean>
</beans>
殺-3在卡特琳娜給了我
[[email protected] ~]$ grep prio 2.txt | sort
"ActiveMQ Broker[localhost] Scheduler" daemon prio=10 tid=0x00007fa8fcea3000 nid=0x411e in Object.wait() [0x00007fa8f0d78000]
"ActiveMQ BrokerService[localhost] Task-1" daemon prio=10 tid=0x00007fa88c041800 nid=0x4171 waiting on condition [0x00007fa8e8217000]
"ActiveMQ BrokerService[localhost] Task-2" daemon prio=10 tid=0x00007fa89c5a8800 nid=0x4175 waiting on condition [0x00007fa873dfc000]
"ActiveMQ BrokerService[localhost] Task-3" daemon prio=10 tid=0x00007fa874020000 nid=0x42b5 waiting on condition [0x00007fa8f107b000]
"ActiveMQ InactivityMonitor ReadCheckTimer" daemon prio=10 tid=0x00007fa8a003d800 nid=0x4173 in Object.wait() [0x00007fa873ffe000]
"ActiveMQ InactivityMonitor Worker" daemon prio=10 tid=0x00007fa88c05a800 nid=0x41b3 waiting on condition [0x00007fa8e891e000]
"ActiveMQ InactivityMonitor Worker" daemon prio=10 tid=0x00007fa88c05d000 nid=0x4220 waiting on condition [0x00007fa8f117c000]
"ActiveMQ InactivityMonitor WriteCheckTimer" daemon prio=10 tid=0x00007fa8a003e800 nid=0x4174 in Object.wait() [0x00007fa873efd000]
"ActiveMQ JDBC PA Scheduled Task" daemon prio=10 tid=0x00007fa878003800 nid=0x413d waiting on condition [0x00007fa8f0772000]
"ActiveMQ JDBC PA Scheduled Task" daemon prio=10 tid=0x00007fa89403a000 nid=0x41bc waiting on condition [0x00007fa872bea000]
"ActiveMQ JDBC PA Scheduled Task" daemon prio=10 tid=0x00007fa8a0041800 nid=0x4205 waiting on condition [0x00007fa8728e7000]
"ActiveMQ JDBC PA Scheduled Task" daemon prio=10 tid=0x00007fa8fcf12000 nid=0x4137 waiting on condition [0x00007fa8f0974000]
"ActiveMQ JDBC PA Scheduled Task" daemon prio=10 tid=0x00007fa8fcf15800 nid=0x4138 waiting on condition [0x00007fa8f0671000]
"ActiveMQ Transport Server: tcp://0.0.0.0:61616" daemon prio=10 tid=0x00007fa8fc7b2000 nid=0x4143 runnable [0x00007fa8f137e000]
"ActiveMQ Transport Server Thread Handler: tcp://0.0.0.0:61616" daemon prio=10 tid=0x00007fa8fc7b1000 nid=0x4142 waiting on condition [0x00007fa8f127d000]
"ActiveMQ Transport: tcp:///127.0.0.1:[email protected]" daemon prio=10 tid=0x00007fa89c5a7800 nid=0x4172 runnable [0x00007fa8e8116000]
"ActiveMQ Transport: tcp:///127.0.0.1:[email protected]" daemon prio=10 tid=0x00007fa87400f000 nid=0x41fd runnable [0x00007fa8729e8000]
"ActiveMQ Transport: tcp://oc7612866413.ibm.com/127.0.0.1:[email protected]" prio=10 tid=0x00007fa88c035800 nid=0x4170 runnable [0x00007fa8e851a000]
"ActiveMQ Transport: tcp://oc7612866413.ibm.com/127.0.0.1:[email protected]" prio=10 tid=0x00007fa89c50b800 nid=0x41fc runnable [0x00007fa8e881d000]
"ajp-bio-8009-Acceptor-0" daemon prio=10 tid=0x00007fa8fc13a000 nid=0x4188 runnable [0x00007fa8736f5000]
"ajp-bio-8009-AsyncTimeout" daemon prio=10 tid=0x00007fa8fc13c000 nid=0x4189 waiting on condition [0x00007fa8735f4000]
"Attach Listener" daemon prio=10 tid=0x00007fa8d0001000 nid=0x415b runnable [0x0000000000000000]
"C2 CompilerThread0" daemon prio=10 tid=0x00007fa8fc0a3800 nid=0x40f6 waiting on condition [0x0000000000000000]
"C2 CompilerThread1" daemon prio=10 tid=0x00007fa8fc0a6800 nid=0x40f7 waiting on condition [0x0000000000000000]
"ContainerBackgroundProcessor[StandardEngine[Catalina]]" daemon prio=10 tid=0x00007fa8fc853000 nid=0x4182 waiting on condition [0x00007fa873af9000]
"Default JMS Resource Adapter-worker- - 10" daemon prio=10 tid=0x00007fa8a00a4800 nid=0x42c3 waiting on condition [0x00007fa871fdd000]
"Default JMS Resource Adapter-worker- - 11" daemon prio=10 tid=0x00007fa8a00a6000 nid=0x42c4 waiting on condition [0x00007fa871edc000]
"Default JMS Resource Adapter-worker- - 1" daemon prio=10 tid=0x00007fa89c597800 nid=0x416e waiting on condition [0x00007fa8e8419000]
"Default JMS Resource Adapter-worker- - 2" daemon prio=10 tid=0x00007fa8a005e000 nid=0x42b7 waiting on condition [0x00007fa8727e5000]
"Default JMS Resource Adapter-worker- - 3" daemon prio=10 tid=0x00007fa8a005f000 nid=0x42b8 waiting on condition [0x00007fa8726e4000]
"Default JMS Resource Adapter-worker- - 4" daemon prio=10 tid=0x00007fa8a0063800 nid=0x42b9 waiting on condition [0x00007fa8725e3000]
"Default JMS Resource Adapter-worker- - 5" daemon prio=10 tid=0x00007fa8a0065800 nid=0x42ba waiting on condition [0x00007fa8724e2000]
"Default JMS Resource Adapter-worker- - 6" daemon prio=10 tid=0x00007fa8a0069000 nid=0x42bb waiting on condition [0x00007fa8723e1000]
"Default JMS Resource Adapter-worker- - 7" daemon prio=10 tid=0x00007fa8a0067800 nid=0x42bc waiting on condition [0x00007fa8722e0000]
"Default JMS Resource Adapter-worker- - 8" daemon prio=10 tid=0x00007fa8a0086800 nid=0x42be waiting on condition [0x00007fa8721df000]
"Default JMS Resource Adapter-worker- - 9" daemon prio=10 tid=0x00007fa8a00a1800 nid=0x42c2 waiting on condition [0x00007fa8720de000]
"Finalizer" daemon prio=10 tid=0x00007fa8fc072800 nid=0x40eb in Object.wait() [0x00007fa8f22d1000]
"GC Daemon" daemon prio=10 tid=0x00007fa8fc9a2000 nid=0x4103 in Object.wait() [0x00007fa8f158a000]
"GC task thread#0 (ParallelGC)" prio=10 tid=0x00007fa8fc021800 nid=0x40e5 runnable
"GC task thread#1 (ParallelGC)" prio=10 tid=0x00007fa8fc023800 nid=0x40e6 runnable
"GC task thread#2 (ParallelGC)" prio=10 tid=0x00007fa8fc025800 nid=0x40e7 runnable
"GC task thread#3 (ParallelGC)" prio=10 tid=0x00007fa8fc027000 nid=0x40e8 runnable
"http-bio-8080-Acceptor-0" daemon prio=10 tid=0x00007fa8fc853800 nid=0x4183 runnable [0x00007fa8739f8000]
"http-bio-8080-AsyncTimeout" daemon prio=10 tid=0x00007fa8fc855800 nid=0x4184 waiting on condition [0x00007fa873bfa000]
"http-bio-8080-exec-10" daemon prio=10 tid=0x00007fa874011800 nid=0x419c waiting on condition [0x00007fa872ceb000]
"http-bio-8080-exec-1" daemon prio=10 tid=0x00007fa88c054800 nid=0x4185 waiting on condition [0x00007fa873cfb000]
"http-bio-8080-exec-2" daemon prio=10 tid=0x00007fa89c755000 nid=0x418a waiting on condition [0x00007fa8734f3000]
"http-bio-8080-exec-3" daemon prio=10 tid=0x00007fa88c055800 nid=0x4194 waiting on condition [0x00007fa8733f2000]
"http-bio-8080-exec-4" daemon prio=10 tid=0x00007fa89c8ea000 nid=0x4195 waiting on condition [0x00007fa8732f1000]
"http-bio-8080-exec-5" daemon prio=10 tid=0x00007fa88c056000 nid=0x4196 waiting on condition [0x00007fa8731f0000]
"http-bio-8080-exec-6" daemon prio=10 tid=0x00007fa88c057800 nid=0x4197 waiting on condition [0x00007fa8730ef000]
"http-bio-8080-exec-7" daemon prio=10 tid=0x00007fa88c059000 nid=0x4198 waiting on condition [0x00007fa872fee000]
"http-bio-8080-exec-8" daemon prio=10 tid=0x00007fa89c8f6800 nid=0x419a waiting on condition [0x00007fa872eed000]
"http-bio-8080-exec-9" daemon prio=10 tid=0x00007fa874013000 nid=0x419b waiting on condition [0x00007fa872dec000]
"http-bio-8443-Acceptor-0" daemon prio=10 tid=0x00007fa8fc137000 nid=0x4186 runnable [0x00007fa8738f7000]
"http-bio-8443-AsyncTimeout" daemon prio=10 tid=0x00007fa8fc138000 nid=0x4187 waiting on condition [0x00007fa8737f6000]
"JDWP Command Reader" daemon prio=10 tid=0x00007fa8c4001000 nid=0x40f3 runnable [0x0000000000000000]
"JDWP Event Helper Thread" daemon prio=10 tid=0x00007fa8fc0a1000 nid=0x40f2 runnable [0x0000000000000000]
"JDWP Transport Listener: dt_socket" daemon prio=10 tid=0x00007fa8fc09d800 nid=0x40ef runnable [0x0000000000000000]
"main" prio=10 tid=0x00007fa8fc00c000 nid=0x40e4 runnable [0x00007fa902425000]
"oracle.jdbc.driver.BlockSource.ThreadedCachingBlockSource.BlockReleaser" daemon prio=10 tid=0x00007fa8fce2f000 nid=0x4127 in Object.wait() [0x00007fa8f147f000]
"org.apache.openejb.pool.scheduler.1" daemon prio=10 tid=0x00007fa8fc902000 nid=0x4147 waiting on condition [0x00007fa8f046f000]
"org.apache.openejb.pool.scheduler.2" daemon prio=10 tid=0x00007fa8fc903000 nid=0x4148 waiting on condition [0x00007fa8f036e000]
"org.apache.openejb.pool.scheduler.3" daemon prio=10 tid=0x00007fa8fc904800 nid=0x4149 waiting on condition [0x00007fa8f026d000]
"org.apache.openejb.pool.scheduler.4" daemon prio=10 tid=0x00007fa8fc906800 nid=0x414a waiting on condition [0x00007fa8f016c000]
"org.apache.openejb.pool.scheduler.5" daemon prio=10 tid=0x00007fa89c5b0000 nid=0x416f waiting on condition [0x00007fa8e8318000]
"PoolIdleReleaseTimer" daemon prio=10 tid=0x00007fa8fcc15800 nid=0x4146 in Object.wait() [0x00007fa8f0c77000]
"Reference Handler" daemon prio=10 tid=0x00007fa8fc06e800 nid=0x40ea in Object.wait() [0x00007fa8f23d2000]
"RetryTimer" daemon prio=10 tid=0x00007fa8fcb39000 nid=0x411d in Object.wait() [0x00007fa8f0b76000]
"RMI Reaper" prio=10 tid=0x00007fa8a4065800 nid=0x4122 in Object.wait() [0x00007fa8f0a75000]
"RMI RenewClean-[127.0.0.1:40595]" daemon prio=10 tid=0x00007fa8a000b800 nid=0x4125 in Object.wait() [0x00007fa8f0873000]
"RMI Scheduler(0)" daemon prio=10 tid=0x00007fa8a406f800 nid=0x4124 waiting on condition [0x00007fa8f0e79000]
"RMI TCP Accept-0" daemon prio=10 tid=0x00007fa8a4064800 nid=0x4121 runnable [0x00007fa8f0570000]
"RMI TCP Accept-1099" daemon prio=10 tid=0x00007fa8fcea7800 nid=0x411f runnable [0x00007fa8f0f7a000]
"Service Thread" daemon prio=10 tid=0x00007fa8fc0a9000 nid=0x40f8 runnable [0x0000000000000000]
"Signal Dispatcher" daemon prio=10 tid=0x00007fa8fc088800 nid=0x40ed waiting on condition [0x0000000000000000]
"Thread-19" daemon prio=10 tid=0x00007fa89c178800 nid=0x414f runnable [0x00007fa8e871c000]
"Thread-23" daemon prio=10 tid=0x00007fa89c8a5800 nid=0x41e5 waiting on condition [0x00007fa872ae9000]
"VM Periodic Task Thread" prio=10 tid=0x00007fa8fc0b3800 nid=0x40f9 waiting on condition
"VM Thread" prio=10 tid=0x00007fa8fc06c800 nid=0x40e9 runnable
甲骨文(不應實際影響,我想,但我在這裏包括,因爲JMS消息將持久)
SQL> show parameter session
NAME TYPE VALUE
------------------------------------ ----------- ------------------------------
java_max_sessionspace_size integer 0
java_soft_sessionspace_limit integer 0
license_max_sessions integer 0
license_sessions_warning integer 0
session_cached_cursors integer 50
session_max_open_files integer 10
sessions integer 324 <<<<<<<<<<<
shared_server_sessions integer
SQL> show parameter processes
NAME TYPE VALUE
------------------------------------ ----------- ------------------------------
aq_tm_processes integer 0
db_writer_processes integer 1
gcs_server_processes integer 0
global_txn_processes integer 1
job_queue_processes integer 0
log_archive_max_processes integer 4
processes integer 200 <<<<<<<<<<
仍然獲得10個JMS工作者和10個sim消耗消息的多餘線程。
不知道還有什麼可嘗試的。
如何增加工人數量?