2014-06-24 217 views
1

我有這樣的文件數組。爲什麼parallel.Invoke在這種情況下不工作

string [] unZippedFiles; 這個想法是我想要並行解析這些文件。當他們被解析時,一個記錄被放置在一個concurrentbag上。隨着記錄越來越多,我想踢更新功能。

以下是我在我的Main()現在做的:

foreach(var file in unZippedFiles) 
    { Parallel.Invoke 
      ( 
           () => ImportFiles(file), 
           () => UpdateTest() 


          ); 
      } 

這是更新loooks的代碼等。

static void UpdateTest() 
    { 
     Console.WriteLine("Updating/Inserting merchant information."); 

     while (!merchCollection.IsEmpty || producingRecords) 
     { 
      merchant x; 
      if (merchCollection.TryTake(out x)) 
      { 

       UPDATE_MERCHANT(x.m_id, x.mInfo, x.month, x.year); 
      } 
     } 



    } 

這就是導入代碼的樣子。這幾乎是一個巨大的字符串解析器。

 System.IO.StreamReader SR = new System.IO.StreamReader(fileName); 
      long COUNTER = 0; 
      StringBuilder contents = new StringBuilder(); 
      string M_ID = ""; 

      string BOF_DELIMITER = "%%MS_SKEY_0000_000_PDF:"; 
      string EOF_DELIMITER = "%%EOF"; 

      try 
      { 
       record_count = 0; 
       producingRecords = true; 
       for (COUNTER = 0; COUNTER <= SR.BaseStream.Length - 1; COUNTER++) 
       { 
        if (SR.EndOfStream) 
        { 
         break; 
        } 
        contents.AppendLine(Strings.Trim(SR.ReadLine())); 
        contents.AppendLine(System.Environment.NewLine); 
        //contents += Strings.Trim(SR.ReadLine()); 
        //contents += Strings.Chr(10); 
        if (contents.ToString().IndexOf((EOF_DELIMITER)) > -1) 
        { 
         if (contents.ToString().StartsWith(BOF_DELIMITER) & contents.ToString().IndexOf(EOF_DELIMITER) > -1) 
         { 
          string data = contents.ToString(); 
          M_ID = data.Substring(data.IndexOf("_M") + 2, data.Substring(data.IndexOf("_M") + 2).IndexOf("_")); 
          Console.WriteLine("Merchant: " + M_ID); 
          merchant newmerch; 
          newmerch.m_id = M_ID; 
          newmerch.mInfo = data.Substring(0, (data.IndexOf(EOF_DELIMITER) + 5)); 
          newmerch.month = DateTime.Now.AddMonths(-1).Month; 
          newmerch.year = DateTime.Now.AddMonths(-1).Year; 
          //Update(newmerch); 
          merchCollection.Add(newmerch); 
         } 
         contents.Clear(); 
         //GC.Collect(); 
        } 
       } 

       SR.Close(); 
       // UpdateTest(); 

      } 
      catch (Exception ex) 
      { 
       producingRecords = false; 

      } 
      finally 
      { 
       producingRecords = false; 
      } 
     } 

我遇到的問題是更新運行一次,然後導入文件函數只是接管並不會產生更新功能。關於我做錯什麼的想法會有很大的幫助。

+0

看起來像是一個計時問題。您的'UpdateTest'在'ImportFiles'有機會將一個項目放入'metchCollection'或將'produceRecords'設置爲true之前完成。沒有看到代碼的其餘部分,這是在黑暗中的野生刺傷。 –

+0

我同意這是一個計時問題。但我想我所問的是爲什麼ImportFile不會屈服。我也將添加導入代碼。 – Waddaulookingat

+0

下面介紹如何修補它:在'foreach'範圍內聲明'merchCollection'作爲'BlockingCollection '*;完全拋棄'produceRecords'變量 - 它不適合線程同步;改變你的更新,通過'foreach(var m in merchCollection.GetConsumingEnumerable())''遍歷阻塞集合。在你的'ImportFiles''finally'塊中調用'merchCollection.CompleteAdding()',這樣你的更新就完成了。瞧! –

回答

3

這是我的刺修復你的線程同步。請注意,我沒有從功能角度改變任何代碼(除了取出catch - 這通常是一個壞主意;異常需要傳播)。

原諒如果有東西不編譯 - 我寫這個基於不完整的片段。

主要

foreach(var file in unZippedFiles) 
{ 
    using (var merchCollection = new BlockingCollection<merchant>()) 
    { 
     Parallel.Invoke 
     ( 
      () => ImportFiles(file, merchCollection), 
      () => UpdateTest(merchCollection) 
     ); 
    } 
} 

更新

private void UpdateTest(BlockingCollection<merchant> merchCollection) 
{ 
    Console.WriteLine("Updating/Inserting merchant information."); 

    foreach (merchant x in merchCollection.GetConsumingEnumerable()) 
    { 
     UPDATE_MERCHANT(x.m_id, x.mInfo, x.month, x.year); 
    } 
} 

進口

不要忘記在merchCollection以作爲參數傳遞 - 它不應該是一成不變的。

  System.IO.StreamReader SR = new System.IO.StreamReader(fileName); 
     long COUNTER = 0; 
     StringBuilder contents = new StringBuilder(); 
     string M_ID = ""; 

     string BOF_DELIMITER = "%%MS_SKEY_0000_000_PDF:"; 
     string EOF_DELIMITER = "%%EOF"; 

     try 
     { 
      record_count = 0; 

      for (COUNTER = 0; COUNTER <= SR.BaseStream.Length - 1; COUNTER++) 
      { 
       if (SR.EndOfStream) 
       { 
        break; 
       } 
       contents.AppendLine(Strings.Trim(SR.ReadLine())); 
       contents.AppendLine(System.Environment.NewLine); 
       //contents += Strings.Trim(SR.ReadLine()); 
       //contents += Strings.Chr(10); 
       if (contents.ToString().IndexOf((EOF_DELIMITER)) > -1) 
       { 
        if (contents.ToString().StartsWith(BOF_DELIMITER) & contents.ToString().IndexOf(EOF_DELIMITER) > -1) 
        { 
         string data = contents.ToString(); 
         M_ID = data.Substring(data.IndexOf("_M") + 2, data.Substring(data.IndexOf("_M") + 2).IndexOf("_")); 
         Console.WriteLine("Merchant: " + M_ID); 
         merchant newmerch; 
         newmerch.m_id = M_ID; 
         newmerch.mInfo = data.Substring(0, (data.IndexOf(EOF_DELIMITER) + 5)); 
         newmerch.month = DateTime.Now.AddMonths(-1).Month; 
         newmerch.year = DateTime.Now.AddMonths(-1).Year; 
         //Update(newmerch); 
         merchCollection.Add(newmerch); 
        } 
        contents.Clear(); 
        //GC.Collect(); 
       } 
      } 

      SR.Close(); 
      // UpdateTest(); 

     } 
     finally 
     { 
      merchCollection.CompleteAdding(); 
     } 
    } 
+0

謝謝基里爾。這看起來有希望。我在嘗試「加速這些代碼」的過程中做了大量的閱讀和大量的學習。我不知道BlockingCollection。我從來沒有想過通過ConcurrentBag – Waddaulookingat

+0

@Waddaulookingat,'BlockingCollection '的美妙之處在於它包含了開箱即用的生產者和消費者同步的方法(通過'CompleteAdding' /'GetConsumingEnumerable'),所以你不用'不用擔心擁有自己的線程同步結構 - 你可以免費獲得它們。祝你好運。 –

相關問題