2015-12-10 66 views
0

我正在設置一個用於使用線程將查詢發送到另一個服務器(LDAP)的服務器(Radius)的模擬器(用於測試)。 查詢需要在x每秒基礎上執行。 我正在使用可調用的調度線程池執行器來實現此目的,以便我可以創建可調用對象並將它們提交給線程池以供執行。 每個線程應該打開自己的連接並使用它進行查詢。 問題是我希望連接在每次使用時都被同一個線程重新使用。Java可調用線程:保持配置

澄清:

如果我還可以說20的線程池我想創建和使用20個連接。 (所以我可以發送讓我們說10.000查詢將由20個線程/連接依次處理)。

現在,要連接的(LDAP)服務器信息被髮送到可調用對象的構造函數,並且可調用對象設置連接以供執行。此後,我使用未來的可調用系統檢索結果。 這個問題是每次我創建一個可調用的連接正在打開(當然後來關閉)。

我正在尋找最佳實踐來保持連接處於活動狀態,並將它們重新用於每個線程。

我已經想到了一些方法來實現這一點,但他們似乎沒有非常有效:

  • 在需要的時候使用連接池我的線程池裏面獲取一個免費的連接(創建死鎖和其他線程安全問題)
  • 使用靜態(左右)陣列的連接和using the thread number檢索其連接(不犯規的證據要麼,見鏈接)

什麼是實現這一點的最有效的方法是什麼? < - 舊問題,請參閱編輯新問題。

編輯: 我在想,因爲我不能安全地獲得一個線程數,但threadId所始終是唯一的,我可以只使用一個

map<String/threadId, connection> 

而且整個地圖(參考)傳遞給調用。這樣,我可以使用類似於:(僞代碼)

Connection con = map.get(this.getThreadId()); 
If (con == null){ 
    con = new Connection(...); 
    map.put(this.getThreadId(), con) 
} 

也可以使地圖靜態並只是靜態訪問它。這樣我就不必將地圖傳遞給Callable。 這將至少是安全的,並不會強迫我重構我的代碼。

新問題: 什麼會更符合最佳實踐;上述解決方案還是Zim-Zam的解決方案? 如果以上是最好的,是不是更好地去靜態解決方案或不是

回答

1

我會使用被Callables之間共享的BlockingQueue,與ScheduledThreadPoolExecutorx查詢到BlockingQueue每秒

public class Worker implements Runnable { 
    private final BlockingQueue<Query> inbox; 
    private final BlockingQueue<Result> outbox; 

    public Worker(BlockingQueue<Query> inbox, BlockingQueue<Result> outbox) { 
     // create LDAP connection 
     this.inbox = inbox; 
     this.outbox = outbox; 
    } 

    public void run() { 
     try { 
      while(true) { 
       // waits for a Query to be available 
       Query query = inbox.take(); 
       // execute query 
       outbox.add(new Result(/* result */)); 
      } 
     } catch(InterruptedException e) { 
      // log and restart? close LDAP connection and return? 
     } 
    } 
} 

public class Master { 
    private final int x; // number of queries per second 
    private final BlockingQueue<Query> outbox = new ArrayBlockingQueue<>(4 * x); 
    private final BlockingQueue<Result> inbox = new ArrayBlockingQueue<>(4 * x); 
    private final ScheduledThreadPoolExecutor executor; 
    private final List<Future<?>> workers = new ArrayList<>(20); 
    private final Future<?> receiver; 

    public Master() { 
    // initialize executor 
    for(int i = 0; i < 20; i++) { 
     Worker worker = new Worker(inbox, outbox); 
     workers.add(executor.submit(worker)); 
    } 

    receiver = executor.submit(new Runnable() { 
     public void run() { 
      while(!Thread.interrupted()) { 
      try { 
       Result result = inbox.take(); 
       // process result 
      } catch(InterruptedException e) { 
       return; 
      } 
      } 
     } 
    } 
    } 

    executor.scheduleWithFixedDelay(new Runnable() { 
     public void run() { 
      // add x queries to the queue 
     } 
    }, 0, 1, TimeUnit.SECONDS); 
} 

使用BlockingQueue#add新添加Queriesoutbox實現這一點,如果拋出一個異常,那麼您的隊列已滿,您需要降低查詢創建速度和/或創建更多工作人員。要打破工人的無限循環cancel(true)Future,這將在Worker內部輸入InterruptedException

+0

謝謝你的回覆@ Zim-Zam! 它看起來不錯,但我需要添加兩件事: - 我使用callable,以便返回搜索結果。 - 因爲我使用callable我需要可調用(/ runnable/worker)來完成執行,因此可以返回結果。在你的例子中它將繼續運行 –

+1

@JBre我已經編輯了我的答案,現在有兩個隊列:'主'通過'BlockingQueue '發送'Query'對象並通過'BlockingQueue '接收'Result'對象' ,同樣'工人'接收'查詢'對象併發送'結果'對象 –

+0

感謝您的快速更新Zim-Zam! 它看起來不錯,完全可用!我當然會記住這個以備將來使用! 一個額外的評論,雖然這是關於我還沒有提到的東西;因爲這是一個需要執行一個測試(執行LDAP查詢)的測試應用程序,所有查詢執行完畢後我纔會收集結果。運行處理結果的額外線程會有點矯枉過正。 (甚至可能會稍微影響測試結果,因爲它佔用了會延遲其他線程的資源) –