2014-02-11 31 views
1

我真的不知道瓶頸在哪裏。如何增加activemq + tomee的同步消費者數量?

我從零開始創建了一個新項目來嘗試隔離變量。

現在我必須爲

製作簡單的東西:

public void createTasks() throws JMSException { 
    for(int i=0;i<1000;i++){ 
     if (i%100 == 0){ 
      System.out.println(i); 
     } 
     MyEntity my = new MyEntity("xyz"+i); 
     my = this.baseService.getMyEntityDAO().addAndFlush(my); 
     jmsService.enqueue(my); 
    }  
    } 

JMS

@Singleton 
@Lock(LockType.WRITE) 
public class JmsService implements Serializable{ 
    public void enqueue(MyEntity target) throws JMSException{ 
    Connection connection = null; 
    Session session = null; 
    try { 
     ConnectionFactory connectionFactory = this.connectionFactory; 
     connection = connectionFactory.createConnection(); 
     connection.start(); 

     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     Queue queue = this.driverJobQueue; 
     MessageProducer producer = session.createProducer(queue); 

     Message message = session.createMessage(); 
     message.setLongProperty("id",target.getId()); 
     message.setStringProperty("type", "xyz"); 
     producer.send(message,DeliveryMode.PERSISTENT,1,0); 
    } finally { 
     // Clean up 
     if (session != null){ 
      session.close(); 
     } 

     if (connection != null){ 
      connection.close(); 
     } 
    } 

    } 

消費者

import javax.annotation.PostConstruct; 
import javax.ejb.ActivationConfigProperty; 
import javax.ejb.EJB; 
import javax.ejb.MessageDriven; 
import javax.inject.Inject; 
import javax.jms.Message; 
import javax.jms.MessageListener; 
import javax.persistence.EntityManager; 
import javax.persistence.PersistenceContext; 

@MessageDriven(activationConfig = { 
    @ActivationConfigProperty(
      propertyName = "destinationType", 
      propertyValue = "javax.jms.Queue"), 
    @ActivationConfigProperty(
      propertyName = "destination", 
      propertyValue = "DriverJobQueue"), 
    @ActivationConfigProperty(
      propertyName = "messageSelector", 
      propertyValue = "type = 'xyz'"), 
    @ActivationConfigProperty(
      propertyName = "acknowledgeMode", 
      propertyValue = "Auto-acknowledge"), 

    @ActivationConfigProperty(
      propertyName = "maxSessions", 
      propertyValue = "30"), 
    @ActivationConfigProperty(
      propertyName = "maxMessagesPerBatch", 
      propertyValue = "30"), 

    @ActivationConfigProperty(
      propertyName = "maxMessagesPerSessions", 
      propertyValue = "30") }) 


public class MyWorker implements MessageListener { 
    @Override 
    public void onMessage(Message msg) { 
    MyEntity entity = null; 
    Long id = null; 
    try { 
     id = msg.getLongProperty("id"); 

     if (entity != null) { 
      System.out.println(id); 
     }else{ 
      System.out.println("ops"); 
     } 

     System.out.println(this+" is processing "+entity); 

    } catch (Throwable e) { 
     e.printStackTrace(); 
    } 

    try { 
     Thread.sleep(10000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 

    System.out.println(this+" finished "+entity); 

    } 
} 

tomee.xml

<Resource id="Default JMS Resource Adapter" type="ActiveMQResourceAdapter"> 
    BrokerXmlConfig = xbean:file:/pathto/activemq.xml 
    ServerUrl = tcp://0.0.0.0:61616 
    threadPoolSize 30 <<<<<<< does that really works? 
    </Resource> 

    <Container id="MyJmsMdbContainer" ctype="MESSAGE"> 
    ResourceAdapter = Default JMS Resource Adapter 
    </Container> 

    <Resource id="Fooo" type="javax.jms.ConnectionFactory"> 
    PoolMaxSize 30 
    </Resource> 

    <Container id="myAllContainer" type="STATELESS"> 
    PoolSize 30 
    StrictPooling true 
    </Container> 

    <Container id="msg" type="MESSAGE"> 
    InstanceLimit 30 
    </Container> 

    <Container id="Foo" type="BMP_ENTITY"> 
    PoolSize 30 
    </Container> 

activemq.xml中

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
     http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> 

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"> 
    <destinationPolicy> 
     <policyMap> 
      <policyEntries> 
       <policyEntry topic=">" producerFlowControl="true"> 
        <pendingMessageLimitStrategy> 
         <constantPendingMessageLimitStrategy limit="1000" /> 
        </pendingMessageLimitStrategy> 
       </policyEntry> 
       <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"> 
       </policyEntry> 
      </policyEntries> 
     </policyMap> 
    </destinationPolicy> 


    <persistenceAdapter> 
     <jdbcPersistenceAdapter dataSource="#oracle-ds" /> 
    </persistenceAdapter> 

    <systemUsage> 
     <systemUsage> 
      <memoryUsage> 
       <memoryUsage limit="128 mb" /> 

      </memoryUsage> 
      <storeUsage> 
       <storeUsage limit="100 gb" /> 
      </storeUsage> 
      <tempUsage> 
       <tempUsage limit="50 gb" /> 
      </tempUsage> 
     </systemUsage> 
    </systemUsage> 

    <transportConnectors> 
     <transportConnector name="tcp" uri="tcp://0.0.0.0:61616" /> 

    </transportConnectors> 
    </broker> 

    <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> 
    <property name="driverClassName" value="oracle.jdbc.OracleDriver" /> 
    <property name="url" value="jdbc:oracle:thin:@localhost:1521:XE" /> 
    <property name="username" value="xxx" /> 
    <property name="password" value="xxx" /> 
    <property name="poolPreparedStatements" value="true" /> 
    <property name="maxActive " value="30" /> 
    </bean> 

</beans> 

殺-3在卡特琳娜給了我

[[email protected] ~]$ grep prio 2.txt | sort 
"ActiveMQ Broker[localhost] Scheduler" daemon prio=10 tid=0x00007fa8fcea3000 nid=0x411e in Object.wait() [0x00007fa8f0d78000] 
"ActiveMQ BrokerService[localhost] Task-1" daemon prio=10 tid=0x00007fa88c041800 nid=0x4171 waiting on condition [0x00007fa8e8217000] 
"ActiveMQ BrokerService[localhost] Task-2" daemon prio=10 tid=0x00007fa89c5a8800 nid=0x4175 waiting on condition [0x00007fa873dfc000] 
"ActiveMQ BrokerService[localhost] Task-3" daemon prio=10 tid=0x00007fa874020000 nid=0x42b5 waiting on condition [0x00007fa8f107b000] 
"ActiveMQ InactivityMonitor ReadCheckTimer" daemon prio=10 tid=0x00007fa8a003d800 nid=0x4173 in Object.wait() [0x00007fa873ffe000] 
"ActiveMQ InactivityMonitor Worker" daemon prio=10 tid=0x00007fa88c05a800 nid=0x41b3 waiting on condition [0x00007fa8e891e000] 
"ActiveMQ InactivityMonitor Worker" daemon prio=10 tid=0x00007fa88c05d000 nid=0x4220 waiting on condition [0x00007fa8f117c000] 
"ActiveMQ InactivityMonitor WriteCheckTimer" daemon prio=10 tid=0x00007fa8a003e800 nid=0x4174 in Object.wait() [0x00007fa873efd000] 
"ActiveMQ JDBC PA Scheduled Task" daemon prio=10 tid=0x00007fa878003800 nid=0x413d waiting on condition [0x00007fa8f0772000] 
"ActiveMQ JDBC PA Scheduled Task" daemon prio=10 tid=0x00007fa89403a000 nid=0x41bc waiting on condition [0x00007fa872bea000] 
"ActiveMQ JDBC PA Scheduled Task" daemon prio=10 tid=0x00007fa8a0041800 nid=0x4205 waiting on condition [0x00007fa8728e7000] 
"ActiveMQ JDBC PA Scheduled Task" daemon prio=10 tid=0x00007fa8fcf12000 nid=0x4137 waiting on condition [0x00007fa8f0974000] 
"ActiveMQ JDBC PA Scheduled Task" daemon prio=10 tid=0x00007fa8fcf15800 nid=0x4138 waiting on condition [0x00007fa8f0671000] 
"ActiveMQ Transport Server: tcp://0.0.0.0:61616" daemon prio=10 tid=0x00007fa8fc7b2000 nid=0x4143 runnable [0x00007fa8f137e000] 
"ActiveMQ Transport Server Thread Handler: tcp://0.0.0.0:61616" daemon prio=10 tid=0x00007fa8fc7b1000 nid=0x4142 waiting on condition [0x00007fa8f127d000] 
"ActiveMQ Transport: tcp:///127.0.0.1:[email protected]" daemon prio=10 tid=0x00007fa89c5a7800 nid=0x4172 runnable [0x00007fa8e8116000] 
"ActiveMQ Transport: tcp:///127.0.0.1:[email protected]" daemon prio=10 tid=0x00007fa87400f000 nid=0x41fd runnable [0x00007fa8729e8000] 
"ActiveMQ Transport: tcp://oc7612866413.ibm.com/127.0.0.1:[email protected]" prio=10 tid=0x00007fa88c035800 nid=0x4170 runnable [0x00007fa8e851a000] 
"ActiveMQ Transport: tcp://oc7612866413.ibm.com/127.0.0.1:[email protected]" prio=10 tid=0x00007fa89c50b800 nid=0x41fc runnable [0x00007fa8e881d000] 
"ajp-bio-8009-Acceptor-0" daemon prio=10 tid=0x00007fa8fc13a000 nid=0x4188 runnable [0x00007fa8736f5000] 
"ajp-bio-8009-AsyncTimeout" daemon prio=10 tid=0x00007fa8fc13c000 nid=0x4189 waiting on condition [0x00007fa8735f4000] 
"Attach Listener" daemon prio=10 tid=0x00007fa8d0001000 nid=0x415b runnable [0x0000000000000000] 
"C2 CompilerThread0" daemon prio=10 tid=0x00007fa8fc0a3800 nid=0x40f6 waiting on condition [0x0000000000000000] 
"C2 CompilerThread1" daemon prio=10 tid=0x00007fa8fc0a6800 nid=0x40f7 waiting on condition [0x0000000000000000] 
"ContainerBackgroundProcessor[StandardEngine[Catalina]]" daemon prio=10 tid=0x00007fa8fc853000 nid=0x4182 waiting on condition [0x00007fa873af9000] 
"Default JMS Resource Adapter-worker- - 10" daemon prio=10 tid=0x00007fa8a00a4800 nid=0x42c3 waiting on condition [0x00007fa871fdd000] 
"Default JMS Resource Adapter-worker- - 11" daemon prio=10 tid=0x00007fa8a00a6000 nid=0x42c4 waiting on condition [0x00007fa871edc000] 
"Default JMS Resource Adapter-worker- - 1" daemon prio=10 tid=0x00007fa89c597800 nid=0x416e waiting on condition [0x00007fa8e8419000] 
"Default JMS Resource Adapter-worker- - 2" daemon prio=10 tid=0x00007fa8a005e000 nid=0x42b7 waiting on condition [0x00007fa8727e5000] 
"Default JMS Resource Adapter-worker- - 3" daemon prio=10 tid=0x00007fa8a005f000 nid=0x42b8 waiting on condition [0x00007fa8726e4000] 
"Default JMS Resource Adapter-worker- - 4" daemon prio=10 tid=0x00007fa8a0063800 nid=0x42b9 waiting on condition [0x00007fa8725e3000] 
"Default JMS Resource Adapter-worker- - 5" daemon prio=10 tid=0x00007fa8a0065800 nid=0x42ba waiting on condition [0x00007fa8724e2000] 
"Default JMS Resource Adapter-worker- - 6" daemon prio=10 tid=0x00007fa8a0069000 nid=0x42bb waiting on condition [0x00007fa8723e1000] 
"Default JMS Resource Adapter-worker- - 7" daemon prio=10 tid=0x00007fa8a0067800 nid=0x42bc waiting on condition [0x00007fa8722e0000] 
"Default JMS Resource Adapter-worker- - 8" daemon prio=10 tid=0x00007fa8a0086800 nid=0x42be waiting on condition [0x00007fa8721df000] 
"Default JMS Resource Adapter-worker- - 9" daemon prio=10 tid=0x00007fa8a00a1800 nid=0x42c2 waiting on condition [0x00007fa8720de000] 
"Finalizer" daemon prio=10 tid=0x00007fa8fc072800 nid=0x40eb in Object.wait() [0x00007fa8f22d1000] 
"GC Daemon" daemon prio=10 tid=0x00007fa8fc9a2000 nid=0x4103 in Object.wait() [0x00007fa8f158a000] 
"GC task thread#0 (ParallelGC)" prio=10 tid=0x00007fa8fc021800 nid=0x40e5 runnable 
"GC task thread#1 (ParallelGC)" prio=10 tid=0x00007fa8fc023800 nid=0x40e6 runnable 
"GC task thread#2 (ParallelGC)" prio=10 tid=0x00007fa8fc025800 nid=0x40e7 runnable 
"GC task thread#3 (ParallelGC)" prio=10 tid=0x00007fa8fc027000 nid=0x40e8 runnable 
"http-bio-8080-Acceptor-0" daemon prio=10 tid=0x00007fa8fc853800 nid=0x4183 runnable [0x00007fa8739f8000] 
"http-bio-8080-AsyncTimeout" daemon prio=10 tid=0x00007fa8fc855800 nid=0x4184 waiting on condition [0x00007fa873bfa000] 
"http-bio-8080-exec-10" daemon prio=10 tid=0x00007fa874011800 nid=0x419c waiting on condition [0x00007fa872ceb000] 
"http-bio-8080-exec-1" daemon prio=10 tid=0x00007fa88c054800 nid=0x4185 waiting on condition [0x00007fa873cfb000] 
"http-bio-8080-exec-2" daemon prio=10 tid=0x00007fa89c755000 nid=0x418a waiting on condition [0x00007fa8734f3000] 
"http-bio-8080-exec-3" daemon prio=10 tid=0x00007fa88c055800 nid=0x4194 waiting on condition [0x00007fa8733f2000] 
"http-bio-8080-exec-4" daemon prio=10 tid=0x00007fa89c8ea000 nid=0x4195 waiting on condition [0x00007fa8732f1000] 
"http-bio-8080-exec-5" daemon prio=10 tid=0x00007fa88c056000 nid=0x4196 waiting on condition [0x00007fa8731f0000] 
"http-bio-8080-exec-6" daemon prio=10 tid=0x00007fa88c057800 nid=0x4197 waiting on condition [0x00007fa8730ef000] 
"http-bio-8080-exec-7" daemon prio=10 tid=0x00007fa88c059000 nid=0x4198 waiting on condition [0x00007fa872fee000] 
"http-bio-8080-exec-8" daemon prio=10 tid=0x00007fa89c8f6800 nid=0x419a waiting on condition [0x00007fa872eed000] 
"http-bio-8080-exec-9" daemon prio=10 tid=0x00007fa874013000 nid=0x419b waiting on condition [0x00007fa872dec000] 
"http-bio-8443-Acceptor-0" daemon prio=10 tid=0x00007fa8fc137000 nid=0x4186 runnable [0x00007fa8738f7000] 
"http-bio-8443-AsyncTimeout" daemon prio=10 tid=0x00007fa8fc138000 nid=0x4187 waiting on condition [0x00007fa8737f6000] 
"JDWP Command Reader" daemon prio=10 tid=0x00007fa8c4001000 nid=0x40f3 runnable [0x0000000000000000] 
"JDWP Event Helper Thread" daemon prio=10 tid=0x00007fa8fc0a1000 nid=0x40f2 runnable [0x0000000000000000] 
"JDWP Transport Listener: dt_socket" daemon prio=10 tid=0x00007fa8fc09d800 nid=0x40ef runnable [0x0000000000000000] 
"main" prio=10 tid=0x00007fa8fc00c000 nid=0x40e4 runnable [0x00007fa902425000] 
"oracle.jdbc.driver.BlockSource.ThreadedCachingBlockSource.BlockReleaser" daemon prio=10 tid=0x00007fa8fce2f000 nid=0x4127 in Object.wait() [0x00007fa8f147f000] 
"org.apache.openejb.pool.scheduler.1" daemon prio=10 tid=0x00007fa8fc902000 nid=0x4147 waiting on condition [0x00007fa8f046f000] 
"org.apache.openejb.pool.scheduler.2" daemon prio=10 tid=0x00007fa8fc903000 nid=0x4148 waiting on condition [0x00007fa8f036e000] 
"org.apache.openejb.pool.scheduler.3" daemon prio=10 tid=0x00007fa8fc904800 nid=0x4149 waiting on condition [0x00007fa8f026d000] 
"org.apache.openejb.pool.scheduler.4" daemon prio=10 tid=0x00007fa8fc906800 nid=0x414a waiting on condition [0x00007fa8f016c000] 
"org.apache.openejb.pool.scheduler.5" daemon prio=10 tid=0x00007fa89c5b0000 nid=0x416f waiting on condition [0x00007fa8e8318000] 
"PoolIdleReleaseTimer" daemon prio=10 tid=0x00007fa8fcc15800 nid=0x4146 in Object.wait() [0x00007fa8f0c77000] 
"Reference Handler" daemon prio=10 tid=0x00007fa8fc06e800 nid=0x40ea in Object.wait() [0x00007fa8f23d2000] 
"RetryTimer" daemon prio=10 tid=0x00007fa8fcb39000 nid=0x411d in Object.wait() [0x00007fa8f0b76000] 
"RMI Reaper" prio=10 tid=0x00007fa8a4065800 nid=0x4122 in Object.wait() [0x00007fa8f0a75000] 
"RMI RenewClean-[127.0.0.1:40595]" daemon prio=10 tid=0x00007fa8a000b800 nid=0x4125 in Object.wait() [0x00007fa8f0873000] 
"RMI Scheduler(0)" daemon prio=10 tid=0x00007fa8a406f800 nid=0x4124 waiting on condition [0x00007fa8f0e79000] 
"RMI TCP Accept-0" daemon prio=10 tid=0x00007fa8a4064800 nid=0x4121 runnable [0x00007fa8f0570000] 
"RMI TCP Accept-1099" daemon prio=10 tid=0x00007fa8fcea7800 nid=0x411f runnable [0x00007fa8f0f7a000] 
"Service Thread" daemon prio=10 tid=0x00007fa8fc0a9000 nid=0x40f8 runnable [0x0000000000000000] 
"Signal Dispatcher" daemon prio=10 tid=0x00007fa8fc088800 nid=0x40ed waiting on condition [0x0000000000000000] 
"Thread-19" daemon prio=10 tid=0x00007fa89c178800 nid=0x414f runnable [0x00007fa8e871c000] 
"Thread-23" daemon prio=10 tid=0x00007fa89c8a5800 nid=0x41e5 waiting on condition [0x00007fa872ae9000] 
"VM Periodic Task Thread" prio=10 tid=0x00007fa8fc0b3800 nid=0x40f9 waiting on condition 
"VM Thread" prio=10 tid=0x00007fa8fc06c800 nid=0x40e9 runnable 

甲骨文(不應實際影響,我想,但我在這裏包括,因爲JMS消息將持久)

SQL> show parameter session 

NAME      TYPE  VALUE 
------------------------------------ ----------- ------------------------------ 
java_max_sessionspace_size   integer  0 
java_soft_sessionspace_limit   integer  0 
license_max_sessions    integer  0 
license_sessions_warning   integer  0 
session_cached_cursors    integer  50 
session_max_open_files    integer  10 
sessions     integer  324 <<<<<<<<<<< 
shared_server_sessions    integer 
SQL> show parameter processes 

NAME      TYPE  VALUE 
------------------------------------ ----------- ------------------------------ 
aq_tm_processes    integer  0 
db_writer_processes    integer  1 
gcs_server_processes    integer  0 
global_txn_processes    integer  1 
job_queue_processes    integer  0 
log_archive_max_processes   integer  4 
processes     integer  200 <<<<<<<<<< 

仍然獲得10個JMS工作者和10個sim消耗消息的多餘線程。

不知道還有什麼可嘗試的。

如何增加工人數量?

回答

1
@Singleton 
@Lock(LockType.WRITE) 
public class JmsService implements Serializable{ 

乍一看,是不是你的瓶頸?從您發佈的代碼,爲什麼不把這樣的:

@Stateless 
public class JmsService implements Serializable{ 

而當你需要你有儘可能多的情況下...