2010-02-22 97 views
17

我有一個Java線程像下面這樣:返回值從Java線程

public class MyThread extends Thread { 
     MyService service; 
     String id; 
     public MyThread(String id) { 
      this.id = node; 
     } 
     public void run() { 
      User user = service.getUser(id) 
     } 
    } 

我有大約300 IDS,每幾秒鐘 - 我火了線程,使每個ID的呼叫。例如。現在

for(String id: ids) { 
    MyThread thread = new MyThread(id); 
    thread.start(); 
} 

,我想從每個線程收集結果,並做了批量插入到數據庫中,而不是使300數據庫插入每2秒。

任何想法我可以做到這一點?

回答

17

如果你想要做的數據庫更新前收集所有的結果,你可以使用invokeAll方法。如果您一次提交一個任務,這將照顧到需要的簿記,如daveb暗示。

private static final ExecutorService workers = Executors.newCachedThreadPool(); 

... 

Collection<Callable<User>> tasks = new ArrayList<Callable<User>>(); 
for (final String id : ids) { 
    tasks.add(new Callable<User>() 
    { 

    public User call() 
     throws Exception 
    { 
     return svc.getUser(id); 
    } 

    }); 
} 
/* invokeAll blocks until all service requests complete, 
* or a max of 10 seconds. */ 
List<Future<User>> results = workers.invokeAll(tasks, 10, TimeUnit.SECONDS); 
for (Future<User> f : results) { 
    User user = f.get(); 
    /* Add user to batch update. */ 
    ... 
} 
/* Commit batch. */ 
... 
+0

就我而言,某些服務調用可能永遠不會返回或返回時間太長。所以'invokeAll'和'awaitTermination(長時間超時)'看起來像是要走的路。因此我接受了這個答案。 希望我也能接受@ daveb的回答。 – Langali 2010-02-22 22:11:08

+0

在這種情況下,您可以使用帶有超時參數的'invokeAll'的重載版本。我會更新我的答案以顯示如何。 – erickson 2010-02-22 22:15:27

+0

謝謝。我不知何故忽略了它。 – Langali 2010-02-22 22:18:34

34

的典型方法是使用一個CallableExecutorServicesubmitCallableExecutorService返回(類型安全)Future從中可以get結果。

class TaskAsCallable implements Callable<Result> { 
    @Override 
    public Result call() { 
     return a new Result() // this is where the work is done. 
    } 
} 

ExecutorService executor = Executors.newFixedThreadPool(300); 
Future<Result> task = executor.submit(new TaskAsCallable()); 
Result result = task.get(); // this blocks until result is ready 

在你的情況,你可能想爲你添加任務執行人使用invokeAll返回的Futures一個List,或創建一個列表你自己。要收集結果,只需致電get

+0

好後,一爲細節。 – fastcodejava 2010-02-22 21:56:51

1

您需要將結果存儲爲類似單例的結果。這必須正確同步。

編輯:我知道這不是最好的建議,因爲處理生Threads不是個好主意。但考慮到這個問題會起作用,不是嗎?我可能沒有投票,但爲什麼投票呢?

1

你可以創建你傳遞給你創建的線程隊列或列表,線程的結果添加到它獲取由消費者執行的批量插入清空列表。

1

最簡單的方法是將對象傳遞給每個線程(每個線程一個對象)以後將包含結果。主線程應該保留對每個結果對象的引用。當所有線程連接在一起時,您可以使用結果。

1
public class TopClass { 
    List<User> users = new ArrayList<User>(); 
    void addUser(User user) { 
     synchronized(users) { 
      users.add(user); 
     } 
    } 
    void store() throws SQLException { 
     //storing code goes here 
    } 
    class MyThread extends Thread { 
      MyService service; 
      String id; 
      public MyThread(String id) { 
       this.id = node; 
      } 
      public void run() { 
       User user = service.getUser(id) 
       addUser(user); 
      } 
     } 
} 
1

您可以創建一個擴展Observable的類。然後你的線程可以調用Observable類中的方法,通過調用Observable.notifyObservers(Object)來通知在該觀察者中註冊的任何類。

觀測類將執行觀測,並與可觀察的註冊。然後,您將實現一個更新(Observable,Object)方法,當Observerable.notifyObservers(Object)被調用時被調用。

4

將結果存儲在您的對象中。當它完成後,讓它自己進入同步集合(想到同步隊列)。

當你想收集你的結果提交,抓住一切從隊列中,並從對象看你的結果。你甚至可以讓每個對象知道如何將自己的結果「發佈」到數據庫中,這樣就可以提交不同的類,並且可以使用完全相同的微小優雅循環來處理所有類。

JDK中有很多工具可以幫助解決這個問題,但是一旦開始將線程看作是一個真正的對象,而不是圍繞「運行」方法的一堆廢話,這真的很容易。一旦你開始思考對象,這種編程變得更簡單和更令人滿意。

+0

+1。我最初的想法很相似,但Java 5現在變得更簡單了! – Langali 2010-02-22 22:41:07

+0

+1。那好美麗。 – 2010-02-22 22:53:09

+0

這是針對一個B類地址範圍的系統做的。它非常優雅,從幾小時減少到幾分鐘。 (好吧,從Ping也變成了一個SNMP get,Java並不真正支持Ping) – 2010-02-23 00:55:12

2

在Java8中,使用CompletableFuture有更好的方法。假設我們有類從數據庫中獲取的ID,爲簡單起見,我們可以只返回下面的數字,

static class GenerateNumber implements Supplier<Integer>{ 

    private final int number; 

    GenerateNumber(int number){ 
     this.number = number; 
    } 
    @Override 
    public Integer get() { 
     try { 
      TimeUnit.SECONDS.sleep(1); 
     }catch (InterruptedException e){ 
      e.printStackTrace(); 
     } 
     return this.number; 
    } 
} 

現在我們可以將結果添加到併發收集一次未來的結果是準備好了。

Collection<Integer> results = new ConcurrentLinkedQueue<>(); 
int tasks = 10; 
CompletableFuture<?>[] allFutures = new CompletableFuture[tasks]; 
for (int i = 0; i < tasks; i++) { 
    int temp = i; 
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()-> new GenerateNumber(temp).get(), executor); 
    allFutures[i] = future.thenAccept(results::add); 
} 

現在,我們可以添加一個回調時,所有的期貨準備就緒,

CompletableFuture.allOf(allFutures).thenAccept(c->{ 
    System.out.println(results); // do something with result 
});