2010-03-27 43 views
1

解釋有點複雜,但我們現在就去。基本上,問題是「如何以有效的方式將問題分解爲子問題」。這裏的「高效」意味着,破碎的子問題儘可能大。基本上,如果我根本不需要解決問題,那將是理想的。但是,因爲工人只能在特定的問題上工作,所以我需要分手。但我想找到儘可能粗糙的方法。分配工作的高效算法?

下面是一些僞代碼..

我們有這樣的問題(對不起這是在Java中,如果你不明白,我會很高興來解釋)。

class Problem { 
    final Set<Integer> allSectionIds = { 1,2,4,6,7,8,10 }; 
    final Data data = //Some data 
} 

而且一個子問題是:

class SubProblem { 
    final Set<Integer> targetedSectionIds; 
    final Data data; 

    SubProblem(Set<Integer> targetedSectionsIds, Data data){ 
     this.targetedSectionIds = targetedSectionIds; 
     this.data = data; 
    } 
} 

工作將這個樣子,然後。

class Work implements Runnable { 
    final Set<Section> subSections; 
    final Data data; 
    final Result result; 

    Work(Set<Section> subSections, Data data) { 
     this.sections = SubSections; 
     this.data = data; 
    } 

    @Override 
    public void run(){ 
     for(Section section : subSections){ 
      result.addUp(compute(data, section)); 
     } 
    } 
} 

現在我們有「工人」的情況下,有自己的狀態sections I have

class Worker implements ExecutorService { 
    final Map<Integer,Section> sectionsIHave; 
    { 
     sectionsIHave = {1:section1, 5:section5, 8:section8 }; 
    } 

    final ExecutorService executor = //some executor. 

    @Override 
    public void execute(SubProblem problem){ 
     Set<Section> sectionsNeeded = fetchSections(problem.targetedSectionIds); 
     super.execute(new Work(sectionsNeeded, problem.data); 
    } 

} 

phew。

所以,我們有很多Problem s和Workers不斷要求更多。我的任務是將​​分解爲SubProblem並提供給他們。然而,難點在於我必須稍後收集SubProblems的所有結果,並將它們合併(減少)爲整個ProblemResult。然而,這是昂貴的,所以我想給工人儘可能大的「塊」(儘可能多的targetedSections)。

它不一定是完美的(數學上儘可能高效或者什麼的)。我的意思是,我想不可能有完美的解決方案,因爲你無法預測每次計算需要多長時間,等等。但是有沒有一個很好的啓發式解決方案?或者,也許我可以在設計之前閱讀一些資源?

任何意見是高度讚賞!

編輯: 我們也控制部分分配,所以控制這是另一種選擇。基本上,對此的唯一限制是工人只能有固定數量的部分。

+0

我真的不知道它是否適用,因爲我沒有足夠的瞭解它,但叉/加入似乎是這樣做的算法。 http://www.ibm.com/developerworks/java/library/j-jtp11137.html – nicerobot 2010-03-27 21:50:45

+0

謝謝。我試圖讓我的頭在附近。但問題是,即使我使用這個框架,我仍然必須提供分割任務的邏輯等等。所以我仍然會遇到這個問題。 – 2010-03-27 22:42:08

+0

分佈式計算當然是一項不平凡的任務,而且一個活動高性能計算(HPC)研究領域。一本體面的大學文本是McGraw-Hill的Michael Quinn的「用MPI和OpenMP編寫C語言的並行編程」ISBM 0-07-282256-2 – 2010-03-28 03:59:05

回答

1

好吧,看起來你的網絡服務有一個分片模型,我們做類似的事情,我們使用「entityId」(sectionId)的反向索引到「client」(worker),它將連接到特定網絡服務將處理該特定實體。最簡單的方法(見下文)是對工作人員使用id的反向映射。如果內存是一個限制,另一種可能是使用一個函數(例如,sectionId%的服務數)。

爲了給服務儘可能多的工作,有一個簡單的批處理算法,將批量填充到某些用戶指定的最大值。這將允許根據遠程服務能夠使用它們的速度大致確定大小的工作量。

public class Worker implements Runnable { 

    private final Map<Integer, Section> sections; 
    private final BlockingQueue<SubProblem> problemQ = new ArrayBlockingQueue<SubProblem>(4096); 
    private final int batchSize; 

    public Worker(final Map<Integer, Section> sectionsIHave, final int batchSize) { 
     this.sections = sectionsIHave; 
     this.batchSize = batchSize; 
    } 

    public Set<Integer> getSectionIds() { 
     return sections.keySet(); 
    } 

    public void execute(final SubProblem command) throws InterruptedException { 

     if (sections.containsKey(command.getSectionId())) { 
      problemQ.put(command); 
     } else { 
      throw new IllegalArgumentException("Invalid section id for worker: " + command.getSectionId()); 
     } 

    } 

    @Override 
    public void run() { 
     final List<SubProblem> batch = new ArrayList<SubProblem>(batchSize); 
     while (!Thread.interrupted()) { 
      batch.clear(); 

      try { 
       batch.add(problemQ.take()); 
       for (int i = 1; i < batchSize; i++) { 
        final SubProblem problem = problemQ.poll(); 
        if (problem != null) { 
         batch.add(problem); 
        } else { 
         break; 
        } 

        process(batch); 
       } 
      } catch (final InterruptedException e) { 
       Thread.currentThread().interrupt(); 
      } 
     } 
    } 

    private void process(final List<SubProblem> batch) { 
     // Submit to remote process. 
    } 

    private static Map<Integer, Worker> indexWorkers(final List<Worker> workers) { 
     final Map<Integer, Worker> temp = new HashMap<Integer, Worker>(); 
     for (final Worker worker : workers) { 
      for (final Integer sectionId : worker.getSectionIds()) { 
       temp.put(sectionId, worker); 
      } 
     } 
     return Collections.unmodifiableMap(temp); 
    } 

    public static void main(final String[] args) throws InterruptedException { 
    // Load workers, where worker is bound to single remote service 
     final List<Worker> workers = getWorkers(); 
     final Map<Integer, Worker> workerReverseIndex = indexWorkers(workers); 
     final List<SubProblem> subProblems = getSubProblems(); 
     for (final SubProblem problem : subProblems) { 
      final Worker w = workerReverseIndex.get(problem.getSectionId()); 
      w.execute(problem); 
     } 
    } 
} 
+0

謝謝!但是你知道,問題是我們的'線程'有狀態(實際上它們是不同的物理機器)。如果我只是在中間切割部分,執行左或右的線程可能沒有那個「部分」... – 2010-03-28 17:13:40