2013-05-21 20 views
13

我需要在Java中構建一個工作者池,其中每個工作者都有自己的連接套接字;當工作線程運行時,它使用套接字,但保持打開狀態以便稍後重用。我們決定採用這種方法,因爲與臨時創建,連接和銷燬套接字相關的開銷需要太多的開銷,所以我們需要一種方法,通過這種方法,一組工作者通過套接字連接進行預初始化,隨時準備承擔的工作,同時保持插座資源從其它線程(插座不是線程安全的)安全,所以我們需要沿着這些路線的東西...預初始化工作線程池以重用連接對象(套接字)

public class SocketTask implements Runnable { 
    Socket socket; 
    public SocketTask(){ 
    //create + connect socket here 
    } 

    public void run(){ 
    //use socket here 
    } 

}

在應用程序啓動時,我們要初始化工作人員以及希望的插座連接太...

MyWorkerPool pool = new MyWorkerPool(); 
for(int i = 0; i < 100; i++) 
    pool.addWorker(new WorkerThread()); 

隨着工作由應用程序的要求,我們派任務給工人池立即執行......

pool.queueWork(new SocketTask(..)); 


更新的工作代碼
基於從灰色和jontejj有益的意見,我有下面的代碼工作...

SocketTask

public class SocketTask implements Runnable { 
    private String workDetails; 
    private static final ThreadLocal<Socket> threadLocal = 
      new ThreadLocal<Socket>(){ 
     @Override 
     protected Socket initialValue(){ 
      return new Socket(); 
     }   
    }; 

    public SocketTask(String details){    
     this.workDetails = details; 
    } 

    public void run(){  
     Socket s = getSocket(); //gets from threadlocal 
     //send data on socket based on workDetails, etc. 
    } 

    public static Socket getSocket(){ 
     return threadLocal.get(); 
    } 
} 

的ExecutorService

ExecutorService threadPool = 
    Executors.newFixedThreadPool(5, Executors.defaultThreadFactory()); 

    int tasks = 15; 
    for(int i = 1; i <= tasks; i++){ 
     threadPool.execute(new SocketTask("foobar-" + i)); 
    } 

我喜歡這種方法有以下幾個原因...

  • 套接字是本地對象(通過ThreadLocal的)提供給正在運行的任務,消除了併發問題。
  • 套接字創建一次並保持打開狀態,當新任務排隊時重複使用 ,消除套接字對象創建/銷燬開銷。
+0

謝謝你的工作代碼片段,對我幫助很大!我想知道:當線程池關閉時,如何關閉數據庫連接? – kaufmanu

+0

搜索執行線程的[有序關閉](http://stackoverflow.com/questions/3332832/graceful-shutdown-of-threads-and-executor),希望它有幫助。 – raffian

+0

我查過了,謝謝!我沒有完全幫助我,雖然...我正在尋找一種方法來調用存儲在ThreadLocal中的所有數據庫連接的dbconnection.close()。我真的不知道如何處理這個......我的意思是,一旦線程池被關閉,db連接就不會自動關閉,對吧? – kaufmanu

回答

10

想法之一是將Socket s放入BlockingQueue。然後,無論何時您需要Socket您的線程可以從隊列take(),當他們完成Socket他們put()它回到隊列中。

public void run() { 
    Socket socket = socketQueue.take(); 
    try { 
     // use the socket ... 
    } finally { 
     socketQueue.put(socket); 
    } 
} 

這有額外的好處:

  • 你可以回去使用ExecutorService代碼。
  • 您可以將套接字通信與處理結果分開。
  • 您不需要1對1的對應關係來處理線程和套接字。但套接字通信可能是98%的工作,所以也許沒有收穫。
  • 當你完成並且你的ExecutorService完成時,你可以通過將它們出隊並關閉它們來關閉你的套接字。

這確實增加了另一個BlockingQueue的額外開銷,但如果你正在做Socket通信,你不會注意到它。

我們不相信的ThreadFactory滿足我們的需要......

我想你可以做這個工作,如果你使用的線程本地人。您的線程工廠將創建一個線程,該線程首先打開套接字並將其存儲在線程本地,然後調用Runnable arg,該套接字完成與套接字的所有工作,從ExecutorService內部隊列中取出作業。一旦完成,arg.run()方法將完成,你可以從線程本地獲取套接字並關閉它。

類似於以下內容。這有點亂,但你應該明白。

ExecutorService threadPool = 
    Executors.newFixedThreadPool(10, 
     new ThreadFactory() { 
     public Thread newThread(final Runnable r) { 
      Thread thread = new Thread(new Runnable() { 
       public void run() { 
        openSocketAndStoreInThreadLocal(); 
        // our tasks would then get the socket from the thread-local 
        r.run(); 
        getSocketFromThreadLocalAndCloseIt(); 
       } 
      }); 
      return thread; 
     } 
     })); 

所以,你的任務將實施Runnable,看起來像:

public SocketWorker implements Runnable { 
    private final ThreadLocal<Socket> threadLocal; 
    public SocketWorker(ThreadLocal<Socket> threadLocal) { 
     this.threadLocal = threadLocal; 
    } 
    public void run() { 
     Socket socket = threadLocal.get(); 
     // use the socket ... 
    } 
} 
+1

不可以。每個線程只調用一次'ThreadFactory'。因此,如果您的線程池有10個線程,則僅在初始化池並啓動線程時才創建套接字。不適用於每個任務@SAFX。 – Gray

+0

記住@SAFX,newThread方法中的run()方法不是您的任務運行。這是ExecutorService用於出列任務並運行它們的運行方法。 – Gray

+0

是的,有點@SAFX。 'r.run()'是併發代碼中可運行的。它將您的任務從內部任務阻塞隊列中取出,並調用您的任務的run()(捕獲異常等)。我將添加一個示例任務類到我的答案。 – Gray

5

我想如果每個線程都有自己的插座應使用ThreadLocal

package com.stackoverflow.q16680096; 

import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 

public class Main 
{ 
    public static void main(String[] args) 
    { 
     ExecutorService pool = Executors.newCachedThreadPool(); 
     int nrOfConcurrentUsers = 100; 
     for(int i = 0; i < nrOfConcurrentUsers; i++) 
     { 
      pool.submit(new InitSocketTask()); 
     } 

     // do stuff... 

     pool.submit(new Task()); 
    } 
} 

package com.stackoverflow.q16680096; 

import java.net.Socket; 

public class InitSocketTask implements Runnable 
{ 
    public void run() 
    { 
     Socket socket = SocketPool.get(); 
     // Do initial setup here 
    } 

} 

package com.stackoverflow.q16680096; 

import java.net.Socket; 

public final class SocketPool 
{ 
    private static final ThreadLocal<Socket> SOCKETS = new ThreadLocal<Socket>(){ 
     @Override 
     protected Socket initialValue() 
     { 
      return new Socket(); // Pass in suitable arguments here... 
     } 
    }; 

    public static Socket get() 
    { 
     return SOCKETS.get(); 
    } 
} 

package com.stackoverflow.q16680096; 

import java.net.Socket; 

public class Task implements Runnable 
{ 
    public void run() 
    { 
     Socket socket = SocketPool.get(); 
     // Do stuff with socket... 
    } 
} 

+0

製作Socket ThreadLocal使它屬於線程而不是工作者,它使您可以直接在任務中訪問套接字。只要確保使用啓動套接字的InitTasks來加熱ThreadPool即可。然後,執行新任務時,ThreadPool中的所有線程都將準備好套接字。 – jontejj

+1

用工作代碼更新了答案。 – jontejj

+0

我用你的例子中的工作代碼更新了我的原始問題。你稱之爲'SocketPool'我在我的代碼中命名了'SocketTask',但它們都有一個靜態的'ThreadLocal '對象來獲取一個本地套接字對象,這看起來對你有效嗎? – raffian