我在學習如何使用池連接,以便通過不必打開/關閉服務器通道來獲得更好的吞吐量,但似乎無法使其工作。當我分叉一個線程並使每個線程運行一個循環來轉儲數據時,我的代碼稍有修改版本,但現在我試圖使用ThreadPoolExecutor
發送單個線程的作業,然後產生2個線程來處理處理工作。我的實驗應該有希望顯示在任何給定時間(或儘可能多的線程)打開2個通道,而是當我更改我的代碼時,我得到了illegalstateexception: pool not open
illegalstateexception池不能用ObjectPool打開
我真的很困惑,如果我的游泳池設計錯誤或我對ThreadPoolExecutor的理解是有缺陷的。我對ThreadPoolExecutor的理解是,如果有工作要做,並且在每次迭代時都不會繼續查殺/重新生成,那麼它會保持線程活着。
這裏是代碼(你可以忽略所有rabbitmq的東西,它的要點是你需要打開一個連接到服務器,然後打開一個通道,我試圖打開一個連接到服務器,然後一個池的共享頻道)。我的想法是創建一個objectpool類的實例,然後將它傳遞給一個可運行的按需要的通道。
代碼:
import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class PoolExample {
private static ExecutorService executor_worker;
static {
final int numberOfThreads_ThreadPoolExecutor = 2;
executor_worker =
new ThreadPoolExecutor(numberOfThreads_ThreadPoolExecutor, numberOfThreads_ThreadPoolExecutor, 1000, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>());
}
public static void main(String[] args) throws Exception {
System.out.println("starting..");
PoolableObjectFactory<MyPooledObject> factory = new MyPoolableObjectFactory();
ObjectPool<MyPooledObject> pool = new GenericObjectPool<MyPooledObject>(factory);
for (int x = 0; x<500000000; x++) {
executor_worker.submit(new Thread(new MyRunnable(x, pool)));
}
}
}
class MyPooledObject {
//Connection connection;
Channel channel;
public MyPooledObject() throws IOException {
System.out.println("hello world");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
channel = connection.createChannel();
}
public Channel sing() throws IOException {
//System.out.println("mary had a little lamb");
return channel;
}
public void destroy() {
System.out.println("goodbye cruel world");
}
}
class MyPoolableObjectFactory extends BasePoolableObjectFactory<MyPooledObject> {
@Override
public MyPooledObject makeObject() throws Exception {
return new MyPooledObject();
}
@Override
public void destroyObject(MyPooledObject obj) throws Exception {
obj.destroy();
}
}
class MyRunnable implements Runnable{
protected int x = 0;
protected ObjectPool<MyPooledObject> pool = null;
public MyRunnable(int x, ObjectPool<MyPooledObject> pool) {
// TODO Auto-generated constructor stub
this.x = x;
this.pool = pool;
}
public void run(){
try {
MyPooledObject obj;
obj = pool.borrowObject();
Channel channel = obj.sing();
String message = Integer.toString(x);
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
pool.returnObject(obj);
} catch (NoSuchElementException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalStateException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
try {
pool.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
任何想法有什麼錯我的設計?或者我的整個方法是否存在有缺陷的對象池?
UPDATE1:每個請求這裏是堆棧跟蹤(我得到很多很多,這些連續的):
堆棧跟蹤:
java.lang.IllegalStateException: Pool not open
at org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140)
at org.apache.commons.pool.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:1079)
at MyRunnable.run(PoolExample.java:85)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:680)
如果有幫助,線85在我的代碼(其中這個錯誤是觸發的):obj = pool.borrowObject();
UPDATE2:很奇怪。我得到錯誤,但它確實將2個項目寫入隊列。我不想發送任何人進行大雁追逐,但我認爲這意味着它可以在創建它們時成功借用對象,但是當它們返回到池中時不能成功借用對象。
UPDATE3:我設計了代碼,所以它沒有經過我上面的中間步驟。我不再發生錯誤,但它基本上沒有任何功能。我啓動10個線程並期望10個通道,但是我只有幾秒鐘才能獲得1個通道,然後它也會關閉。 代碼:
import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class PoolExample {
private static ExecutorService executor_worker;
static {
final int numberOfThreads_ThreadPoolExecutor = 20;
executor_worker =
new ThreadPoolExecutor(numberOfThreads_ThreadPoolExecutor, numberOfThreads_ThreadPoolExecutor, 1000, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>());
}
public static void main(String[] args) throws Exception {
System.out.println("starting..");
ObjectPool<Channel> pool =
new GenericObjectPool<Channel>(
new ConnectionPoolableObjectFactory(), 5);
for (int x = 0; x<500000000; x++) {
executor_worker.submit(new MyRunnable(x, pool));
}
executor_worker.shutdown();
pool.close();
}
}
class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory<Channel> {
Channel channel;
public ConnectionPoolableObjectFactory() throws IOException {
System.out.println("hello world");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
channel = connection.createChannel();
}
@Override
public Channel makeObject() throws Exception {
return channel;
}
@Override
public boolean validateObject(Channel channel) {
return channel.isOpen();
}
@Override
public void destroyObject(Channel channel) throws Exception {
channel.close();
}
@Override
public void passivateObject(Channel channel) throws Exception {
//System.out.println("sent back to queue");
}
}
class MyRunnable implements Runnable{
protected int x = 0;
protected ObjectPool<Channel> pool;
public MyRunnable(int x, ObjectPool<Channel> pool) {
// TODO Auto-generated constructor stub
this.x = x;
this.pool = pool;
}
public void run(){
try {
Channel channel = pool.borrowObject();
String message = Integer.toString(x);
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
pool.returnObject(channel);
} catch (NoSuchElementException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalStateException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
什麼是異常的完整堆棧跟蹤? – Jeffrey
@Jeffrey更新了包含它的答案。 – Lostsoul