0

我試圖通過Parallel.ForEach將處理的數據添加到BlockingCollection來處理大量的文本文件。可以將(true)循環轉換爲EventWaitHandle嗎?

問題是我想要Task taskWriteMergedFile消耗集合並將它們寫入結果文件至少每隔800000行。

我想我不能在迭代中測試集合的大小,因爲它是平行的,所以我創建了Task

在這種情況下,我可以將任務中的(true)循環轉換爲EventWaitHandle嗎?

const int MAX_SIZE = 1000000; 
static BlockingCollection<string> mergeData; 
mergeData = new BlockingCollection<string>(new ConcurrentBag<string>(), MAX_SIZE); 


string[] FilePaths = Directory.GetFiles("somepath"); 

var taskWriteMergedFile = new Task(() => 
{ 
    while (true) 
    { 
     if (mergeData.Count > 800000) 
     { 
      String.Join(System.Environment.NewLine, mergeData.GetConsumingEnumerable()); 
      //Write to file 
     } 
     Thread.Sleep(10000); 
    } 
}, TaskCreationOptions.LongRunning); 

taskWriteMergedFile.Start(); 
Parallel.ForEach(FilePaths, FilePath => AddToDataPool(FilePath)); 
mergeData.CompleteAdding(); 

回答

1

你可能不想這樣做。相反,讓您的任務在收到文件時將每行寫入文件。如果要將文件大小限制爲80,000行,則在寫入第80,000行之後,關閉當前文件並打開一個新文件。

想想吧,你有什麼不能工作,因爲GetConsumingEnumerable()不會停止,直到集合標記爲完成添加。會發生什麼情況是,直到主機線程調用CompleteAdding,直到隊列中有80,000個項目纔會通過睡眠循環,然後阻塞在String.Join上。有足夠的數據,你會用完內存。

另外,除非你有很好的理由,否則你不應該在這裏使用ConcurrentBag。只需使用BlockingCollection的默認值即ConcurrentQueue即可。 ConcurrentBag是一個相當特殊的數據結構,其性能不如ConcurrentQueue

所以,你的任務就變成了:

var taskWriteMergedFile = new Task(() => 
{ 
    int recordCount = 0; 
    foreach (var line in mergeData.GetConsumingEnumerable()) 
    { 
     outputFile.WriteLine(line); 
     ++recordCount; 
     if (recordCount == 80,000) 
     { 
      // If you want to do something after 80,000 lines, do it here 
      // and then reset the record count 
      recordCount = 0; 
     } 
    } 
}, TaskCreationOptions.LongRunning); 

,它假定,當然,前提是你已經打開輸出文件的其他地方。在任務開始時打開輸出可能會更好,並在foreach退出後關閉它。

另一個說明,你可能不希望你的生產者循環是平行的。您有:

Parallel.ForEach(FilePaths, FilePath => AddToDataPool(FilePath)); 

我不肯定知道AddToDataPool正在做呢,但如果它讀取文件和寫入數據的收集,你有一對夫婦的問題。首先,磁盤驅動器一次只能做一件事,所以它最終會讀取一個文件的一部分,然後是另一個文件的一部分,然後是另一個文件的一部分,等等。爲了讀取下一個文件的每個塊,它必須尋找頭部到適當的位置。磁盤磁頭尋找代價非常昂貴 - 5毫秒或更多。 CPU時間的永恆。除非您正在執行比讀取文件花費更長時間的繁重處理,否則您幾乎總是最好一次處理一個文件。除非您可以保證輸入文件位於單獨的物理磁盤上。 。 。

第二個潛在的問題是,在多線程運行的情況下,不能保證事物寫入集合的順序。當然,這可能不是問題,但是如果您希望單個文件中的所有數據在輸出中組合在一起,那麼不會發生多個線程,每個線程都將多行寫入集合。

只是要記住。