2011-03-21 70 views
0

我有一個列表只讀數據是我想要處理的互聯網上的一串端點。我想知道是否有任何類型的內置類或模式,我應該遵循來處理?使用線程處理列表的智能方式

現在我只是跳水的初始列表爲ñ塊和創造ñ線程來處理每個請求。

回答

4

使用ExecutorService來處理您的併發處理。

public void processAll(List<Endpoint> endpoints, int numThreads) { 
    ExecutorService executor = Executors.newFixedThreadPool(numThreads); 

    for(final Endpoint endpoint : endpoints) { 
     executor.submit(new Runnable() { 
      @Override 
      public void run() { 
       doProcessing(endpoint); 
      } 
     }); 
    } 
    // instead of a loop, you could also use ExecutorService#invokeAll() 
    // with the passed-in list if Endpoint implements 
    // java.util.concurrent.Callable 

    executor.shutdown(); 
} 

private void doProcessing(Endpoint endpoint) { 
    // do whatever you do with each one 
} 

這只是一個簡單的例子。查看一些關於如何使用更具體的ExecutorService類型的示例的API,處理Futures,並做各種漂亮的東西。

0

查看java.util.concurrent包和ExecutorService。

Brian Goetz的書Java Concurrency in Practice是理解這些東西的必備工具。

2

聽起來像是Queue(使用java.util.concurrent中的一個實現)就是你需要的。這樣,每個線程在準備就緒時就可以獲得鏈接,這比事先進行分區更有意義。

0

阻塞隊列可能是最適合您的方式。谷歌它,你會發現很多的信息,這是一個很好的教程:http://www.developer.com/java/ent/article.php/3645111/Java-5s-BlockingQueue.htm

1

您將需要三認爲:

  • 兩個阻塞列表 - 用數據來porcess的結果第一,第二
  • 執行人服務
  • 某種鎖

我的示例應用程序:

public class App { 

    private static final int NUMBER_OF_THREADS = 3; 

    public static void main(String[] args) throws InterruptedException { 

     BlockingQueue<String> data = prepareData(); 

     BlockingQueue<String> results = new LinkedBlockingQueue<String>(); 

     ExecutorService executor = Executors.newFixedThreadPool(3); 
     CountDownLatch countDownLatch = new CountDownLatch(3); 

     for (int i = 0; i < NUMBER_OF_THREADS; i++) 
      executor.execute(new Processor<String>(data, results, 
        countDownLatch, i + "")); 

     countDownLatch.await(); 
    } 

    private static BlockingQueue<String> prepareData() { 
     BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); 
     for (int i = 0; i < 1000; i++) { 
      queue.add(i + ""); 
     } 
     return queue; 
    } 

} 

class Processor<T> implements Runnable { 

    private BlockingQueue<T> dataSource; 

    private CountDownLatch latch; 

    private String name; 

    private BlockingQueue<String> results; 

    public Processor(BlockingQueue<T> dataSource, 
      BlockingQueue<String> results, CountDownLatch countDownLatch, 
      String processName) { 
     this.dataSource = dataSource; 
     this.results = results; 
     this.latch = countDownLatch; 
     this.name = processName; 
    } 

    @Override 
    public void run() { 
     T t = null; 
     while ((t = dataSource.poll()) != null) { 
      try { 
       String result = "Process " + name + " processing: " 
         + t.toString(); 
       System.out.println(result); 
       results.put(result); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
     latch.countDown(); 
    } 
} 

準備好數據後創建一些處理器來處理數據。每個處理器都有對線程保存數據源的引用。獲取對象,處理它們並最終將結果放到另一個包含結果的線程保存集合中。

當數據源變空時,然後調用latch.countDown()向等待結果的主線程或線程說「完成了一切」。