2013-02-27 69 views
0

我編寫了一個小工具來讀取大型文本文件並搜索包含搜索詞的行。我將此作爲學習TPL數據流的機會。任務無法運行

代碼工作正常,除非搜索條件接近文件的末尾。在這種情況下,除非中存在斷點,否則uiResult操作塊不會被調用

我的理解是,數據是從searcher張貼uiResultsearcher成爲完整的(它已處理上一個數據塊)之後。由於數據已發佈到uiResult,所以在數據處理完成之後才能完成。

問題

爲什麼是它uiResult變得完整,即使數據被張貼到它(除非斷點在uiResult設置)?

代碼

下面是相關的代碼,重新修整地:

ActionBlock<LineInfo> uiResult = new ActionBlock<LineInfo>(li => 
    { 
     // If match found near end of file, the following line only runs 
     // if a breakpoint is set on it: 
     if (results != null) results.Add(li); 
    }, 
    new ExecutionDataflowBlockOptions() 
    { 
     MaxDegreeOfParallelism = 1, 
     CancellationToken = cancelSource.Token, 
     TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() 
    }); 

BatchBlock<LineInfo> batcher = new BatchBlock<LineInfo>(5000); 

ActionBlock<LineInfo[]> searcher = new ActionBlock<LineInfo[]>(lines => 
    { 
     foreach (LineInfo li in lines) 
     { 
      if (li.TextOfLine.Contains(searchTerm)) 
      { 
       uiResult.Post(li); 
      } 
     } 
    }, 
    new ExecutionDataflowBlockOptions() 
    { 
     MaxDegreeOfParallelism = 1, 
     CancellationToken = cancelSource.Token 
    }); 

batcher.LinkTo(searcher); 

batcher.Completion.ContinueWith(t => 
{ 
    if (t.IsFaulted) ((IDataflowBlock)searcher).Fault(t.Exception); 
    else searcher.Complete(); 

    if (t.IsFaulted) ((IDataflowBlock)uiResult).Fault(t.Exception); 
    else uiResult.Complete(); 
}); 

Task.Run(() => 
    { 
     using (FileStream fs = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) 
     using (BufferedStream bs = new BufferedStream(fs)) 
     using (StreamReader sr = new StreamReader(bs)) 
     { 
      string line; 
      while ((line = sr.ReadLine()) != null && cancelSource.IsCancellationRequested == false) 
      { 
       batcher.Post(new LineInfo() { LineNumber = lineNumber, OffsetOfLine = offset, TextOfLine = line }); 
      } 

      batcher.Complete(); 
      try 
      { 
       searcher.Completion.Wait(); 
       uiResult.Completion.Wait(); 
      } 
      catch (AggregateException ae) 
      { 
       TaskCanceledException taskCancelled = ae.InnerException as TaskCanceledException; 
       if (taskCancelled != null) 
       { 
        // Swallow the Exception if is just a user cancellation 
        throw; 
       } 
      } 
      finally 
      { 
       signalDone(); 
      } 
     } 
    }); 
+0

你真的說設置斷點會改變行爲嗎?這非常可疑。 – svick 2013-02-27 20:38:29

+0

@svick:是的,我可以始終如一地重現那種行爲。感覺像是有一個線程計時問題被斷點改變了。我的理解是,所寫的代碼應該是確定性的。 – 2013-02-27 20:39:44

回答

1

你的代碼是因爲你處理完成的方式不確定性。事件的可能的順序是這樣的:

  1. Task處理整個文件,並呼籲batcherComplete()
  2. batcher處理最後一批,發送到searcher並完成。
  3. 繼續執行,在searcheruiResult上調用Complete()
  4. 由於uiResult沒有工作要做,它完成。
  5. searcher處理最後一批,嘗試將每個結果發送到uiResult。但uiResult已經完成,所以它拒絕了一切。這意味着Post()返回false,但你沒有檢查。

所以問題是,你正試圖發送一些東西到已經完成的塊,這是行不通的。

解決方法是在塊完成之前(即其Completion完成)僅在塊之後調用Complete()。可能最簡單的方法是使用PropagateCompletionLinkTo()

+0

我現在看到你指出的問題,但不明白解決方案。 PropagateCompletion的文檔有點亮。我如何重構'batcher.Completion周圍的代碼。ContinueWith'正確處理錯誤,但等待'searcher'和'uiResult'完成現有工作? – 2013-02-27 22:27:25

+0

@EricJ。就像我說的,不要使用'ContinueWith()',而是使用'PropagateCompletion'來代替。當時間到時,這將完成或錯誤地處理下面的塊。如果你想繼續使用'COntinueWith()',你需要兩個continuation:一個用於完成'searcher'的batcher.Completion'和另一個用於完成'uiResult'的searcher.Completion'。 – svick 2013-02-27 22:37:29

+0

謝謝,我解決了我的問題。爲了確保我理解......在我的例子中'searcher'和'uiResult'都是'ActionBlock 's,所以我不能使用LinkTo(既不實現'IPropagatorBlock ')。我通過在配送器 - >搜索器鏈接中使用PropogateCompletion來解決這個問題。我是否也可以將'searcher'改成鏈接到'uiResult'的'TransformManyBlock ',並返回一個空枚舉(對於不匹配搜索的行)或1個元素的枚舉(對於匹配的行)? – 2013-03-01 18:19:07