2012-12-11 147 views
0

我有很多大小很大的文件。他們都需要解析,並且需要相當長的時間。所以,我提出了一個想法:當一個線程讀取一個文件(硬盤的讀取速度在這裏是瓶頸)時,另一個線程應該解析來自行的所需信息,並在解析發生時,文件讀取線程應該下一個文件等等。解析和讀取線程?

我只會創建兩個線程,一個通過File.ReadAllLines讀取所有文件,另一個線程解析返回的數組。但是這會消耗很多內存。因此,我需要將讀取的文件數量限制爲5。

我遇到的另一個問題是等待獲取線程過程完成。解析線程應該知道是否有準備解析數組。

問題是,我應該遵循什麼方式?有沒有一個例子(我找不到)?還是有更好的主意?

+0

解析代碼是什麼樣的?你對你解析的數據做什麼? – Steve

+0

我通過檢查程序的輸出文件的每一行來解析變量。並非所有的行都被讀取一次。因爲如果滿足一些條件,我必須回到第n行。我正在將每個信息獲取到一個變量並在DataGridViews中顯示它們。 – theGD

+0

好的,也許你可以發佈你的順序代碼,以便我們更好地瞭解你正在做的事情。此外,有些單個文件非常大,或者你可以將整個文件讀入字符串數組並在內存中進行處理,而不是逐行讀取? – Steve

回答

1

如果你是肯定的,那解析總是比閱讀速度快得多,你可以做到這一點很簡單: 線程A(只是一個線程不會阻止UI線程)讀取文件,然後啓動一個新的線程B和將文件內容傳遞給它(使用任務而不是線程使其更容易)。把它放到一個循環中,你就完成了。 由於解析速度更快,第二個線程/任務將在線程A啓動新線程之前完成。所以你只能同時運行兩個線程和兩個內存中的文件。

等待獲取線程過程完成。解析線程應該知道是否有準備解析數組。

不知道我是否理解正確,但可以通過上述「解決方案」解決。因爲你總是開始一個新的線程/任務,WHEN並且只有當文件被完全讀取時。

更新:如果處理不是(總是)比讀取速度更快,你可能例如像這樣做:

Private MaxTasks As Integer = 4 

Private Async Sub ReadAndProcess(ByVal FileList As List(Of String)) 

    Dim ProcessTasks As New List(Of Task) 

    For Each fi In FileList 
     Dim tmp = fi 
     Console.WriteLine("Reading {0}", tmp) 
     Dim FileContent = Await Task.Run(Of Byte())(Function() As Byte() 
                 Return File.ReadAllBytes(tmp) 
                End Function) 
     If ProcessTasks.Count >= MaxTasks Then 
      Console.WriteLine("I have to wait!") 
      Dim NextReady = Await Task.WhenAny(ProcessTasks) 
      ProcessTasks.Remove(NextReady) 
     End If 

     Console.WriteLine("I can start a new process-task!") 
     ProcessTasks.Add(Task.Run(Sub() 
             Console.WriteLine("Processing {0}", tmp) 
             Dim l As Long 
             For Each b In FileContent 
              l += b 
             Next 
             System.Threading.Thread.Sleep(2000) 
             Console.WriteLine("Done with {0}", tmp) 
            End Sub)) 
    Next 

    Await Task.WhenAll(ProcessTasks) 

End Sub 

Private Async Sub Button1_Click(sender As Object, e As EventArgs) Handles Button1.Click 

    Dim ofd As New OpenFileDialog 
    ofd.Multiselect = True 
    If ofd.ShowDialog = Windows.Forms.DialogResult.OK AndAlso ofd.FileNames.Count >= 1 Then 
     ReadAndProcess(ofd.FileNames.ToList) 
    End If 

End Sub 

的想法(這通常作爲可在4種方式的.Net實現)簡單地說,你安排新的處理任務,直到你達到你自己設定的限制。如果達到了這個目標,你就可以「等待」,直到任務準備就緒並開始新的任務。

UPDATE2:隨着TPL LIB它可能看起來像:

Private Sub Doit() 

    Dim ABProcess As New ActionBlock(Of Tuple(Of String, Byte()))(Sub(tp) 
                     Console.WriteLine("Processing {0}", tp.Item1) 
                     Dim l As Long 
                     For Each el In tp.Item2 
                      l += el 
                     Next 
                     System.Threading.Thread.Sleep(1000) 
                     Console.WriteLine("Done with {0}", tp.Item1) 
                    End Sub, New ExecutionDataflowBlockOptions With {.MaxDegreeOfParallelism = 4, .BoundedCapacity = 4}) 

    Dim ABRead As New ActionBlock(Of String())(Async Sub(sarr) 
                For Each s In sarr 
                 Console.WriteLine("Reading {0}", s) 
                 Dim t = New Tuple(Of String, Byte())(s, File.ReadAllBytes(s)) 
                 Dim taken = Await ABProcess.SendAsync(t) 
                 Console.WriteLine("Output taken = {0}", taken) 
                Next 
                Console.WriteLine("All reading done") 
               End Sub) 

    Dim ofd As New OpenFileDialog 
    ofd.Multiselect = True 
    If ofd.ShowDialog = Windows.Forms.DialogResult.OK Then 
     ABRead.Post(ofd.FileNames) 
    End If 

End Sub 

哪個版本是「更好」 ......可能是個人口味;)我個人可能更喜歡「手動」版本,因爲新的TPL塊有時非常黑匣子。

+0

解析速度更快並不一定。 – theGD

+0

@theGD:更新了我的答案。當然,有兩種以上的方式來殺死一隻貓。 – igrimpe

0

可能取決於所需的處理,通過分割任務來提高性能。整個過程聽起來與生產者/消費者設置類似。

您可能想要檢出Blocking Queue排隊處理。這個想法是讓讀線程隊列項目和處理線程去隊列和處理項目。

1

我有一個非常類似的應用和使用BlockingCollection

BlockingCollection Overview

在我的情況分析是比讀取速度更快,但我有問題的文件是不一樣的大小,以便閱讀解析可以等待。
使用BlockingCollection,隊列大小爲8管理大小差異並保持內存不變。
如果解析速度較慢,也可以將解析設置爲並行。
如果你正在閱讀一個單一的頭,然後平行讀並不會幫助。

static void Main(string[] args) 
{ 
    // A blocking collection that can hold no more than 5 items at a time. 
    BlockingCollection<string[]> fileCollection = new BlockingCollection<string[]>(5); 

    // Start one producer and one consumer. 
    Task.Factory.StartNew(() => NonBlockingConsumer(fileCollection)); // parse - can use parallel 
    Task.Factory.StartNew(() => NonBlockingProducer(fileCollection)); // read 
} 

什麼是解析的本質是什麼?
您一次解析一行嗎?
我會在並行解析文件之前查看並行解析行。
Parallel.ForEach Method