2012-12-07 62 views
4

我想創建一個Erastosthenes Java程序的簡單並行Sieve,至少比我下面發佈的串行版本更有效一些。Java多線程 - Eratosthenes Sieve的基本並行示例

public void runEratosthenesSieve(int upperBound) { 
     int upperBoundSquareRoot = (int) Math.sqrt(upperBound); 
     boolean[] isComposite = new boolean[upperBound + 1]; 
     for (int m = 2; m <= upperBoundSquareRoot; m++) { 
      if (!isComposite[m]) { 
        System.out.print(m + " "); 
       int threads=4; 
       for (int n=1; n<=threads; n++) { 
        int job; 
        if (n==1) {job = m * m;} else {job = (n-1)*upperBound/threads;} 
        int upToJob = n*upperBound/threads; 
        for (int k = job; k <= upToJob; k += m) 
        { 
         isComposite[k] = true; 
        }    
       } 
      } 
     } 
     for (int m = upperBoundSquareRoot; m <= upperBound; m++) 
      if (!isComposite[m]) 
        System.out.print(m + " "); 
    } 

我已經創建了一個循環來分割4個線程的工作。雖然我不知道如何從中創建實際的線程代碼。如何發送變量併爲每個線程啓動4個線程。

+1

您可能想查看[Fork/Join](http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html)框架。 –

+0

我已更新我的問題。 –

回答

4

我可以提出以下解決方案:有4個工作線程和1個主線程。工作線程從隊列中獲取作業。工作基本上是3個數字:從,到,一步。主人的意思,而必須等到所有線程完成。完成後,它將搜索下一個素數並創建4個作業。主人和工人之間的同步可以通過使用Semaphore來實現:主人嘗試獲得4個許可證,而每個工作人員在完成時釋放1個許可證。

public class Sieve { 

    // Number of workers. Make it static for simplicity. 
    private static final int THREADS = 4; 

    // array must be shared between master and workers threads so make it class property. 
    private boolean[] isComposite; 
    // Create blocking queue with size equal to number of workers. 
    private BlockingQueue<Job> jobs = new ArrayBlockingQueue<Job>(THREADS); 
    private Semaphore semaphore = new Semaphore(0); 
    // Create executor service in order to reuse worker threads. 
    // we can use just new Thread(new Worker()).start(). But using thread pools more effective. 
    private ExecutorService executor = Executors.newFixedThreadPool(THREADS); 

    public void runEratosthenesSieve(int upperBound) { 
     int upperBoundSquareRoot = (int) Math.sqrt(upperBound); 
     isComposite = new boolean[upperBound + 1]; 

     // Start workers. 
     for (int i = 0; i < THREADS; i++) { 
      executor.submit(new Worker()); 
     } 
     for (int m = 2; m <= upperBoundSquareRoot; m++) { 
      if (!isComposite[m]) { 
       System.out.print(m + " "); 
       for (int n=1; n<= THREADS; n++) { 
        int from; 
        if (n == 1) { 
         from = m * m; 
        } else { 
         from = (n-1)*upperBound/THREADS; 
        } 
        Job job = new Job(from, n*upperBound/threads, m); 
        // Submit job to queue. We don't care which worker gets the job. 
        // Important only that only 1 worker get the job. But BlockingQueue does all synchronization for us. 
        jobs.put(job); 
       } 
       // Wait until all jobs are done. 
       semaphore.acquire(THREADS); 
      } 
     } 
     for (int i = 0; i < n; i++) { 
      // put null to shutdown workers. 
      jobs.put(null); 
     } 
     for (int m = upperBoundSquareRoot; m <= upperBound; m++) { 
      if (!isComposite[m]) { 
       System.out.print(m + " "); 
      } 
     } 
    } 

    private class Job { 
     public int from, to, step; 

     public Job(int from, int to, int step) { 
      this.from = from; 
      this.to = to; 
      this.step = step; 
     } 
    } 

    private Worker implements Runnable { 
     while (true) { 
      Job job = jobs.take(); 
      // null means workers must shutdown 
      if (job == null) { 
       return; 
      } 
      for (int i = job.from; i <= job.to; i += job.step) { 
       isComposite[i] = true; 
      } 
      // Notify master thread that a job was done. 
      semaphore.release(); 
     } 
    } 
} 
+0

巨大的感謝!正是我在找什麼! –

+0

我唯一不知道的是jobs.put()和jobs.take()方法是如何工作的。 –

+0

閱讀有關BlockingQueue的文檔。基本上把增加的元素放入隊列中,從隊列或塊中返回第一個元素,直到有人放入元素爲止。 –