2009-08-19 57 views
0

我不知道標題是否有意義,但在我寫的應用程序中有很多(擴展)方法。一個簡單的例子:自動執行循環

對象:

Matter (Burn, Explode, Destroy, Freeze, Heat, Cool) 
Atom (Attach, Detach) 
<many more> 

和一個自定義集合,如:

ImmutableList<T> 

和方法,像這樣:

public static class Burner 
{ 
    public static Matter Burn (this Matter matter) 
    { 
     // matter is burning ... 
    } 
} 

var matters = new ImmutableList<Matter>(); 
matters.Burn(); 

正如你可以看到,刻錄作品上一個單一的實體,但仍然出現在ImmutableList上。這樣我就可以自己管理並行處理(並行)。

我該如何做到最高效,最乾淨,最可維護的方式,還是結合在一起?

首先,我寧願不定義另一個需要每個類(燃燒器等)內的ImmutableList的擴展方法,因爲有成百上千的像這些,他們可能會看起來相同。但我對創意持開放態度。

此外,所有代碼都是我的,所以我可以在代碼的任何部分更改/添加任何內容,而不僅僅是擴展方法。

回答

2

這有什麼錯

matters.ForEach(Burner.Burn); 

與自己實現ForEach

+0

感謝但ForEach並不平行,我想管理自己的並行化以滿足應用程序的特定需求。如果可能的話,我寧願在我的文章中的語法。 – 2009-08-19 18:38:39

+0

也許我沒有正確理解你的問題,但是什麼阻止你實現自己的'ForEach'方法? – dtb 2009-08-19 18:40:29

+0

我可以,但我想有一個像這樣的語法:matters.Burn();否則代碼中會出現很多ForEach調用,並不是說這是一件壞事,但它很明顯,我想要一個「直接」調用,因此也就是擴展方法。 – 2009-08-19 18:42:01

0

創建自己的ForEachParallel擴展,然後,如果你不想使用PLINQ或東西

1

下面是一個簡單的類,以並行的方式進行迭代。

埃姆雷Aydinceren

用法:

Parallel.ForEach(事項,物質=> matter.Burn());

matters.ParallelForEach(物質=> matter.Burn());

/// <summary> 
/// Provides concurrent processing on a sequence of elements 
/// </summary> 
public static class Parallel 
{ 
    /// <summary> 
    /// Number Of parallel tasks 
    /// </summary> 
    public static int NumberOfParallelTasks; 


    static Parallel() 
    { 
     NumberOfParallelTasks = Environment.ProcessorCount < 65 ? Environment.ProcessorCount : 64; 
    } 

    /// <summary> 
    /// Performs the specified action on each element of the sequence in seperate threads. 
    /// </summary> 
    /// <typeparam name="T">The type of the elements of source.</typeparam> 
    /// <param name="source">A sequence that contains elements to perform action</param> 
    /// <param name="action">The Action delegate to perform on each element of the IEnumerable.</param> 
    public static void ForEach<T>(IEnumerable<T> source, Action<T> action) 
    { 
     if(source == null) return; 

     //create a new stack for parallel task we want to run , stack is very performant to add and read elements in sequence 
     var stacks = new Stack<T>[NumberOfParallelTasks]; 

     //instantiate stacks 
     for(var i = 0;i < NumberOfParallelTasks;i++) 
     { 
      stacks[i] = new Stack<T>(); 
     } 

     var itemCount = 0; 

     //spread items in source to all stacks while alternating between stacks 
     foreach(var item in source) 
     { 
      stacks[itemCount++ % NumberOfParallelTasks].Push(item); 
     } 

     if(itemCount==0)return; 

     //if we have less items than number of Parallel tasks we should only spun threads for active stacks 
     var activeStackCount = itemCount < NumberOfParallelTasks ? itemCount : NumberOfParallelTasks; 

     //events are used to notify thread pool completed 
     var events = new ManualResetEvent[activeStackCount]; 

     for(var index = 0;index < activeStackCount;index++) 
     { 
      //assign index to a scope variable otherwise in multithreading index will not be consistant 
      var listIndex = index; 

      events[listIndex] = new ManualResetEvent(false); 

      ThreadPool.QueueUserWorkItem(delegate 
      { 
       //name the thread for debugging 
       if(String.IsNullOrEmpty(Thread.CurrentThread.Name)) 
       { 
        Thread.CurrentThread.Name = String.Format("Parallel.ForEach Worker Thread No:{0}", listIndex); 
       } 

       try 
       { 
        //iterate through our stack 
        var stack = stacks[listIndex]; 
        foreach(var item in stack) 
        { 
         action(item); 
        } 
       } 
       finally 
       { 
        //fire the event to signal WaitHandle that our thread is completed 
        events[listIndex].Set(); 
       } 

      }); 
     } 

     WaitAll(events); 

    } 

    private static void WaitAll(WaitHandle[] waitHandles) 
    { 
     if(Thread.CurrentThread.GetApartmentState() == ApartmentState.STA) 
     { 
      for(var index = 0;index < waitHandles.Length;index++) WaitHandle.WaitAny(waitHandles); 
     } 
     else 
     { 
      WaitHandle.WaitAll(waitHandles); 
     } 
    } 

    /// <summary> 
    /// Performs the specified action on each element of the sequence in seperate threads. 
    /// </summary> 
    /// <typeparam name="T">The type of the elements of source.</typeparam> 
    /// <param name="source">A sequence that contains elements to perform action</param> 
    /// <param name="action">The Action delegate to perform on each element of the IEnumerable.</param> 
    public static void ParallelForEach<T>(this IEnumerable<T> source, Action<T> action) 
    { 
     ForEach(source, action); 
    } 

} 
+0

不應該只有一個併發感知堆棧任務和一組線程。當每個線程啓動時,它會檢查堆棧的工作情況。如果堆棧爲空,則退出。通過這種方式,有些工作很長,有些很短,最終不會有一個線程運行10個快速作業並退出,其他運行10個慢速作業本身。 – jmucchiello 2009-09-11 19:57:16

2

您可能會發現this article是一個有趣的閱讀。它討論了一個平行的foreach是如何工作的,既可以自己做,也可以使用.NET 3.5的Parallel extensions CTP。隨着CTP,你可以做到這一點(從文章上面採取爲例):

using System.Threading; 

// A simple string collection 
string[] numbers = { "One", "Two", "Three", "Four", "Five", "Six", "Seven", 
    "Eight", "Nine", "Ten", "Eleven", "Twelve", "Thirteen", "Fourteen", "Fifteen"}; 

// equivalent to: foreach (string n in numbers) 
Parallel.ForEach<string>(numbers, delegate(string n) 
{ 
    Console.WriteLine("n={0}", n.ToString()); 
}); 

你應該毫不猶豫地在生產代碼使用CTP,除非它只是你自己的項目(在這種情況下,你可能應該要嘗試CTP)。