2017-01-19 31 views
2

我有以下問題,如何ThreadPoolExecutor的分配線程之間的任務

我的任務隊列,並且有很多的類型,如任務:

A, B, C, D, ... 

我執行這些任務的線程池。

但我必須在同一時間,限制同一類型的任務執行,因此,這是不好的:

Thread-1: [A, D, C, B, ...] 
Thread-2: [A, C, D, B, ...] 

A和B型的任務可以在同一時間執行。

但是這是件好事:

Thread-1: [A,B,A,B,...] 
Thread-2: [C,D,D,C,...] 

因此,相同類型的任務總是順序執行。

實現此功能的最簡單方法是什麼?

+0

你的問題很含糊,不知道我明白了。但是你可以考慮只使用2個獨立的線程池。 – Jamie

+0

@Jamie這是非常笨拙的解決方案。 – corvax

+0

這也可以嗎?線程1 [A1,A2,B1,B2]? – john16384

回答

0
CompletableFuture.supplyAsync(this::doTaskA) 
       .thenAccept(this::useResultFromTaskAinTaskB); 

發生了什麼事上面是任務A和相關任務B實際上是在同一個線程中運行(一前一後,沒必要「搞定」一個新的線程開始運行任務B)。

或者你可以使用runAsync任務A,如果你不需要從它的任何信息,但確實需要等待它運行的任務B.

默認情況下之前完成,CompletableFuture的將使用普通螺紋池,但是如果您想要更多地控制使用哪個ThreadPool,則可以使用您自己的使用自己的ThreadPool的Executor將第二個參數傳遞給async方法。

1

這個問題很容易就可以通過像Akka這樣的演員框架來解決。

對於每種類型的任務。創建一個演員。

對於每個單獨的任務,創建一條消息並將其發送給相應類型的參與者。消息可以是Runnable類型的,因爲他們可能是現在,和演員的反應方法可以 @Override public void onReceive(Object msg) { ((Runnable)msg).run(); }

這樣,你的程序將正常運行任意數量的線程。

+0

是的,Akka可以很容易地解決這個問題,但是我認爲整合AKK解決這個問題的開銷很大。 – corvax

1

我想你可以實現你自己的DistributedThreadPool來控制線程。這就像某種主題訂閱者/發佈者結構。

我做了一個例子如下:

class DistributeThreadPool { 

Map<String, TypeThread> TypeCenter = new HashMap<String, TypeThread>(); 

public void execute(Worker command) { 
    TypeCenter.get(command.type).accept(command); 
} 

class TypeThread implements Runnable{ 

    Thread t = null; 
    LinkedBlockingDeque<Runnable> lbq = null; 

    public TypeThread() { 
     lbq = new LinkedBlockingDeque<Runnable>(); 
    } 

    public void accept(Runnable inRun) { 
     lbq.add(inRun); 
    } 

    public void start() { 
     t = new Thread(this); 
     t.start(); 
    } 


    @Override 
    public void run() { 
     while (!Thread.interrupted()) { 
      try { 
       lbq.take().run(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
} 

public DistributeThreadPool(String[] Types) { 
    for (String t : Types) { 
     TypeThread thread = new TypeThread(); 
     TypeCenter.put(t, thread); 
     thread.start(); 
    } 
} 

public static void main(String [] args) { 
     DistributeThreadPool dtp = new DistributeThreadPool(new String[] {"AB","CD"}); 

     Worker w1 = new Worker("AB",()->System.out.println(Thread.currentThread().getName() +"AB")); 
     Worker w2 = new Worker("AB",()->System.out.println(Thread.currentThread().getName() +"AB")); 
     Worker w3 = new Worker("CD",()->System.out.println(Thread.currentThread().getName() +"CD")); 
     Worker w4 = new Worker("CD",()->System.out.println(Thread.currentThread().getName() +"CD")); 
     Worker w5 = new Worker("CD",()->System.out.println(Thread.currentThread().getName() +"CD")); 

     List<Worker> workers = new ArrayList<Worker>(); 
     workers.add(w1); 
     workers.add(w2); 
     workers.add(w3); 
     workers.add(w4); 
     workers.add(w5); 

     workers.forEach(e->dtp.execute(e)); 
    } 
} 
+0

如果有1000種類型的任務會怎樣? – Anton

+0

你的意思是你想要有1000種類型的任務在同一時間順序運行? @Anton – Chuck

0

有趣的問題。想到兩個問題:

有多少種不同類型的任務?

如果相對較少,最簡單的方法可能是爲每種類型創建一個線程並將每個傳入任務分配給它的線程類型。只要類型之間的任務是平衡的(這是一個很大的假設),利用率就足夠好了。

任務完成的預期時間/延遲是多少?

如果你的問題是及時性靈活,你可以通過數或時間間隔各種批次傳入任務,提交每個你退休池批次,然後等待批完成後提交另一份同類型的。

您可以調整第二種替代方式,使其與批量一樣小,在這種情況下,等待完成的機制對於效率而言非常重要。 CompletableFuture將符合這裏的法案;你可以將「輪詢A類型的下一個任務並提交給池」操作鏈接到thenRunAsync的任務中,然後開始執行任務。

您必須爲每個任務類型維護一個外部任務隊列; FJ池的工作隊列將僅用於進行中的任務。儘管如此,這種設計很有可能合理處理每種類型的任務數量和工作量不平衡。

希望這會有所幫助。

0

實現按鍵執行器。每項任務都應有鑰匙。具有相同密鑰的任務將排隊並將被連續執行,具有不同密鑰的任務將並行執行。

Implementation in netty

你可以嘗試自己做它,但它是棘手而且容易出錯。我可以在那裏看到幾個答案。

相關問題