2012-05-02 23 views
2

我在學習如何使用池連接,以便通過不必打開/關閉服務器通道來獲得更好的吞吐量,但似乎無法使其工作。當我分叉一個線程並使每個線程運行一個循環來轉儲數據時,我的代碼稍有修改版本,但現在我試圖使用ThreadPoolExecutor發送單個線程的作業,然後產生2個線程來處理處理工作。我的實驗應該有希望顯示在任何給定時間(或儘可能多的線程)打開2個通道,而是當我更改我的代碼時,我得到了illegalstateexception: pool not openillegalstateexception池不能用ObjectPool打開

我真的很困惑,如果我的游泳池設計錯誤或我對ThreadPoolExecutor的理解是有缺陷的。我對ThreadPoolExecutor的理解是,如果有工作要做,並且在每次迭代時都不會繼續查殺/重新生成,那麼它會保持線程活着。

這裏是代碼(你可以忽略所有rabbitmq的東西,它的要點是你需要打開一個連接到服務器,然後打開一個通道,我試圖打開一個連接到服務器,然後一個池的共享頻道)。我的想法是創建一個objectpool類的實例,然後將它傳遞給一個可運行的按需要的通道。

代碼:

import org.apache.commons.pool.BasePoolableObjectFactory; 
import org.apache.commons.pool.ObjectPool; 
import org.apache.commons.pool.PoolableObjectFactory; 
import org.apache.commons.pool.impl.GenericObjectPool; 

import java.io.IOException; 
import java.util.NoSuchElementException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.LinkedBlockingDeque; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 


import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.MessageProperties; 

public class PoolExample { 

    private static ExecutorService executor_worker; 
    static { 
     final int numberOfThreads_ThreadPoolExecutor = 2; 
     executor_worker = 
      new ThreadPoolExecutor(numberOfThreads_ThreadPoolExecutor, numberOfThreads_ThreadPoolExecutor, 1000, TimeUnit.SECONDS, 
            new LinkedBlockingDeque<Runnable>()); 
    } 

    public static void main(String[] args) throws Exception { 
     System.out.println("starting..");  
     PoolableObjectFactory<MyPooledObject> factory = new MyPoolableObjectFactory(); 
     ObjectPool<MyPooledObject> pool = new GenericObjectPool<MyPooledObject>(factory); 
     for (int x = 0; x<500000000; x++) { 
      executor_worker.submit(new Thread(new MyRunnable(x, pool))); 
     } 
    } 
} 

class MyPooledObject { 
    //Connection connection; 
    Channel channel; 
    public MyPooledObject() throws IOException { 
     System.out.println("hello world"); 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     channel = connection.createChannel(); 
    } 

    public Channel sing() throws IOException { 
     //System.out.println("mary had a little lamb"); 
     return channel; 
    } 

    public void destroy() { 
     System.out.println("goodbye cruel world");  
    } 
} 

class MyPoolableObjectFactory extends BasePoolableObjectFactory<MyPooledObject> { 
    @Override 
    public MyPooledObject makeObject() throws Exception { 
     return new MyPooledObject(); 
    } 
    @Override 
    public void destroyObject(MyPooledObject obj) throws Exception { 
     obj.destroy(); 
    } 
} 

class MyRunnable implements Runnable{ 
    protected int x = 0; 
    protected ObjectPool<MyPooledObject> pool = null; 

    public MyRunnable(int x, ObjectPool<MyPooledObject> pool) { 
     // TODO Auto-generated constructor stub 
     this.x = x; 
     this.pool = pool; 
    } 

    public void run(){ 
     try { 
       MyPooledObject obj; 
       obj = pool.borrowObject(); 
       Channel channel = obj.sing(); 
       String message = Integer.toString(x); 
       channel.basicPublish("", "task_queue", 
         MessageProperties.PERSISTENT_TEXT_PLAIN, 
         message.getBytes()); 
       pool.returnObject(obj); 
     } catch (NoSuchElementException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } catch (IllegalStateException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } catch (Exception e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } finally { 
      try { 
       pool.close(); 
      } catch (Exception e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 
    } 
} 

任何想法有什麼錯我的設計?或者我的整個方法是否存在有缺陷的對象池?

UPDATE1:每個請求這裏是堆棧跟蹤(我得到很多很多,這些連續的):

堆棧跟蹤:

java.lang.IllegalStateException: Pool not open 
    at org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140) 
    at org.apache.commons.pool.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:1079) 
    at MyRunnable.run(PoolExample.java:85) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) 
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:138) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
    at java.lang.Thread.run(Thread.java:680) 

如果有幫助,線85在我的代碼(其中這個錯誤是觸發的):obj = pool.borrowObject();

UPDATE2:很奇怪。我得到錯誤,但它確實將2個項目寫入隊列。我不想發送任何人進行大雁追逐,但我認爲這意味着它可以在創建它們時成功借用對象,但是當它們返回到池中時不能成功借用對象。

UPDATE3:我設計了代碼,所以它沒有經過我上面的中間步驟。我不再發生錯誤,但它基本上沒有任何功能。我啓動10個線程並期望10個通道,但是我只有幾秒鐘才能獲得1個通道,然後它也會關閉。 代碼:

import org.apache.commons.pool.BasePoolableObjectFactory; 
import org.apache.commons.pool.ObjectPool; 
import org.apache.commons.pool.PoolableObjectFactory; 
import org.apache.commons.pool.impl.GenericObjectPool; 

import java.io.IOException; 
import java.util.NoSuchElementException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.LinkedBlockingDeque; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 


import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.MessageProperties; 

public class PoolExample { 

    private static ExecutorService executor_worker; 
    static { 
     final int numberOfThreads_ThreadPoolExecutor = 20; 
     executor_worker = 
      new ThreadPoolExecutor(numberOfThreads_ThreadPoolExecutor, numberOfThreads_ThreadPoolExecutor, 1000, TimeUnit.SECONDS, 
            new LinkedBlockingDeque<Runnable>()); 
    } 

    public static void main(String[] args) throws Exception { 
     System.out.println("starting..");   
     ObjectPool<Channel> pool = 
       new GenericObjectPool<Channel>(
       new ConnectionPoolableObjectFactory(), 5); 

     for (int x = 0; x<500000000; x++) { 
      executor_worker.submit(new MyRunnable(x, pool)); 
     } 
     executor_worker.shutdown(); 
     pool.close(); 
    } 
} 

class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory<Channel> { 
    Channel channel; 

    public ConnectionPoolableObjectFactory() throws IOException { 
     System.out.println("hello world"); 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     channel = connection.createChannel(); 
    } 

    @Override 
    public Channel makeObject() throws Exception {     
     return channel; 
    } 

    @Override 
    public boolean validateObject(Channel channel) { 
     return channel.isOpen(); 
    } 

    @Override 
    public void destroyObject(Channel channel) throws Exception { 
     channel.close(); 
    } 

    @Override 
    public void passivateObject(Channel channel) throws Exception { 
     //System.out.println("sent back to queue"); 
    } 
} 

class MyRunnable implements Runnable{ 
    protected int x = 0; 
    protected ObjectPool<Channel> pool; 

    public MyRunnable(int x, ObjectPool<Channel> pool) { 
     // TODO Auto-generated constructor stub 
     this.x = x; 
     this.pool = pool; 
    } 

    public void run(){ 
     try { 
       Channel channel = pool.borrowObject(); 
       String message = Integer.toString(x); 
       channel.basicPublish("", "task_queue", 
         MessageProperties.PERSISTENT_TEXT_PLAIN, 
         message.getBytes()); 
       pool.returnObject(channel); 
     } catch (NoSuchElementException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } catch (IllegalStateException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } catch (Exception e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 
+0

什麼是異常的完整堆棧跟蹤? – Jeffrey

+0

@Jeffrey更新了包含它的答案。 – Lostsoul

回答

1

我不知道它是什麼原因造成的例外,但這是錯誤的:

executor_worker.submit(new Thread(new MyRunnable(x, pool))); 

你不應該傳遞線程對象submit。你應該只是傳遞MyRunnable的實例。

executor_worker.submit(new MyRunnable(x, pool)); 
+0

謝謝,我會在未來的代碼中遵循您的建議,但即使進行了更改,錯誤仍然存​​在。 – Lostsoul