2013-08-16 28 views
4

我已經構建了此代碼來並行處理大量字符串之間的字符串比較,以便更快。轉儲多線程訪問ConcurrentBag文件不夠快

我已經使用了ConcurrentBag,所以所有線程(任務)都可以寫入線程安全集合。然後我將這個集合轉儲到一個文件中。

我遇到的問題是我轉儲到文件的ConcurrentBag<string> log的填充速度比寫入文件的速度要快。所以我的程序不斷消耗越來越多的內存,直到內存耗盡。

我的問題是我能做些什麼?改進寫入日誌?暫停任務,直到ConcurrentBag被轉儲,然後恢復任務?什麼是最快的選擇?

下面是代碼:

CsvWriter csv = new CsvWriter(@"C:\test.csv"); 

List<Bailleur> bailleurs = DataLoader.LoadBailleurs(); 
ConcurrentBag<string> log = new ConcurrentBag<string>(); 
int i = 0; 

var taskWriteToLog = new Task(() => 
{ 
    // Consume the items in the bag 
    string item; 
    while (true) // (!log.IsEmpty) 
    { 
     if (!log.IsEmpty) 
     { 
      if (log.TryTake(out item)) 
      { 
       csv.WriteLine(item); 
      } 
      else 
       Console.WriteLine("Concurrent Bag busy"); 
     } 
     else 
     { 
      System.Threading.Thread.Sleep(1000); 
     } 
    } 
}); 

taskWriteToLog.Start(); 

Parallel.ForEach(bailleurs, s1 => 
{ 
    foreach (Bailleur s2 in bailleurs) 
    { 
     var lcs2 = LongestCommonSubsequenceExtensions.LongestCommonSubsequence(s1.Name, s2.Name); 
     string line = String.Format("\"LCS\",\"{0}\",\"{1}\",\"{2}\"", s1.Name, s2.Name, lcs2.Item2); 
     log.Add(line); 
     // Console.WriteLine(line); 

     var dic = DiceCoefficientExtensions.DiceCoefficient(s1.Name, s2.Name); 
     line = String.Format("\"DICE\",\"{0}\",\"{1}\",\"{2}\"", s1.Name, s2.Name, dic); 
     log.Add(line); 
     // Console.WriteLine(line); 
    } 
    i++; 
    Console.WriteLine(i); 
}); 

public class CsvWriter 
{ 
    public string FilePath { get; set; } 
    private FileStream _fs { get; set; } 
    private StreamWriter _sw { get; set; } 

    public CsvWriter2(string filePath) 
    { 
     FilePath = filePath; 
     _fs = new FileStream(FilePath, FileMode.Create, FileAccess.Write); 
     _sw = new StreamWriter(_fs); 
    } 

    public void WriteLine(string line) 
    { 
     _sw.WriteLine(line); 
    } 
} 

回答

7

不要直接使用併發袋,使用BlockingCollection具有併發袋作爲後備存儲(默認情況下它是一個併發隊列)。

其中一個constructor overloads可以設置集合大小的上限,如果袋子滿了,它將阻止插入線程直到有空間插入。

它還爲您提供了GetConsumingEnumerable(),使您可以非常輕鬆地從包中取出物品,您只需在foreach循環中使用該物品,它將繼續提供消費者數據,直至調用CompleteAdding。之後,它會一直運行,直到袋子變空,然後像已完成的其他正常IEnumerable一樣退出。如果在CompleteAdding被調用之前袋子「變幹」,它將阻塞線程並在更多數據放入袋子時自動重啓。

void ProcessLog() 
{ 
    CsvWriter csv = new CsvWriter(@"C:\test.csv"); 

    List<Bailleur> bailleurs = DataLoader.LoadBailleurs(); 

    const int MAX_BAG_SIZE = 500; 
    BlockingCollection<string> log = new BlockingCollection<string>(new ConcurrentBag<string>(), MAX_BAG_SIZE); 

    int i = 0; 

    var taskWriteToLog = new Task(() => 
    { 
     // Consume the items in the bag, no need for sleeps or poleing, When items are available it runs, when the bag is empty but CompletedAdding has not been called it blocks. 
     foreach(string item in log.GetConsumingEnumerable()) 
     { 
      csv.WriteLine(item); 
     } 
    }); 

    taskWriteToLog.Start(); 

    Parallel.ForEach(bailleurs, s1 => 
    { 
     //Snip... You can switch to BlockingCollection without any changes to this section of code. 
    }); 

    log.CompleteAdding(); //lets anyone using GetConsumingEnumerable know that no new items are comming so they can leave the foreach loops when the bag becomes empty. 
} 
+0

太棒了!它的作品非常好!非常感謝 –

+0

@ Arno2501知道,當你需要向相反的方向前進時,這個類也非常有用,一個編寫器放入包中,許多線程都調用'GetConsumingEnumberable()'在可用時進行工作。 'Parallel.ForEach(myBlockingCollection.GetConsumingEnumberable(),...)'工作得很好,我在使用'IDataReader'的項目中使用它很多,只能由一個線程訪問,但是結果中的工作可以是並行處理。 –

+0

非常有用的信息再次感謝。真棒看到並行處理變得越來越簡單:-) –

2

使用BlockingCollection而不是ConcurrentBag

BlockingCollection<string> log = new BlockingCollection<string>(); 
var item = log.Take(); 

在這種情況下Take直到項目被插入,你會不會有檢查log.IsEmpty將被阻止。這裏也將不再需要了Thread.Sleep

while (true) 
{ 
    var item = log.Take(); 
    //Do something with item...... 
} 
+0

甚至不要用'Take'懶得和'而(真)'使用'foreach'環和'GetConsumingEnumerable()' –

+0

這裏的問題似乎是,集合灌裝速度太快,不太慢了。'BlockingCollection'也可以幫助你,但你需要明確地設置它的容量。 – svick

+0

@svick由於'Thread.Sleep'可能會填滿太快。 BlockingCollection可能對此有所幫助... – I4V

0

首先,它看起來就像你正在編寫使用線路作爲自己的塊文件?

如果您可以將所有數據導入對象並將其寫出爲較大的塊,則會更快。目前,您可能正在創建您正在寫入的設備的最大IOPS。你的線條很小。所以你的寫模式看起來像4k Random IO ..或者更糟。

使用不同的集合不會改變磁盤寫入是最慢的事情的事實。

看着concurrentbag可能不是直接可能的,但是如果你可以從你的包中刪除行,並將它們連接成一個接近1-5MB的大字符串/字節數組,你應該提高性能。 (您可能需要插入CR LF的回字符串)。

+0

除非您將[FileOptions](http://msdn.microsoft.com/en-us/library/system.io.fileoptions.aspx)設置爲'WriteThrough',否則它將緩衝寫入,也會是順序寫入隨機寫道,他不是在尋求中間呼叫,所以表現不會那麼糟 –

+0

是的,斯科特是真的,IO一點都不差。我只能獲得比我能夠在相同時間內寫入更多的數據。 –

+0

足夠公平,我不確定線寫入的緩存行爲。 只要不在兩行之間沖洗,您可以放心地忽略我:D – Insanemal