2015-01-26 78 views
0

我定期進入我認爲是parallel.ForEach循環內的競態條件。我這樣說是因爲它總是掛在代碼的那一部分上。Parallel.ForEach競賽條件

try 
{ 
    Parallel.ForEach(Directory.EnumerateFiles(directory, "*.tracex", SearchOption.TopDirectoryOnly), _po, (path, ls) => 
     { 
      DebugFile file; 
      if (filterDate) 
      { 
       if (filterUser) 
       { 
        file = new DebugFile(path, startTime, endTime, user); 
       } 
       else file = new DebugFile(path, startTime, endTime); 
      } 
      else if (filterUser) 
      { 
       file = new DebugFile(path, user); 
      } 
      else file = new DebugFile(path); 
      if (!file.IsFiltered()) 
      { 
       _files.Add(file); 
      } 
      Interlocked.Increment(ref _loadCount); // increment how many we've checked 
      if (_po.CancellationToken.IsCancellationRequested) 
      { 
       ls.Break(); 
      } 
     }); 
} 
catch (OperationCanceledException oce) 
{ 
    Debug.WriteLine(oce.ToString()); 
} 

在我的_files對象中,我調用Add方法時處理鎖定。

public virtual void Add(T item) 
{ 
    _lock.EnterWriteLock(); 
    try 
    { 
     _bindingList.Add(item); 
    } 
    finally 
    { 
     _lock.ExitWriteLock(); 
    } 
    OnListChanged(new ListChangedEventArgs(ListChangedType.ItemAdded, _bindingList.Count - 1)); 
} 

任何想法我在做什麼錯在這裏?它並不是每一次,只是間歇性的。另外,至少對我而言,第一次調用代碼時不會發生。它只會發生,如果我打電話一次,然後再次調用它,通常是第二次或第三次。

謝謝!

UPDATE 我意識到我正在使用自定義任務調度程序。當我刪除它時,我不再看到掛起。我這樣做,所以我可以定製我正在運行多少個線程。我的想法是,因爲我主要通過網絡讀取文件,IO會減慢速度,所以我可以一次運行更多的任務。以下是我如何構建調度程序:

public class TaskSchedulerForSlowIO : TaskScheduler 
{ 
    /// <summary> 
    /// maximum number of tasks to run concurrently 
    /// </summary> 
    private int _maxConcurrencyLevel; 

    /// <summary> 
    /// lock for reading tasks array 
    /// </summary> 
    private ReaderWriterLockSlim _listLock = new ReaderWriterLockSlim(); 

    /// <summary> 
    /// list of tasks running 
    /// </summary> 
    private LinkedList<Task> _tasks = new LinkedList<Task>(); 

    /// <summary> 
    /// Default constructor - This will increase threadpool limits if necessary 
    /// </summary> 
    public TaskSchedulerForSlowIO() 
     : base() 
    { 
     _maxConcurrencyLevel = Environment.ProcessorCount * 10; 
     int workerThreads, ioThreads, minimumConcurrency; 
     minimumConcurrency = Environment.ProcessorCount * 2; 
     ThreadPool.GetMaxThreads(out workerThreads, out ioThreads); 
     if (workerThreads < _maxConcurrencyLevel) 
     { 
      if (ioThreads < _maxConcurrencyLevel) 
      { 
       ioThreads = _maxConcurrencyLevel; 
      } 
      ThreadPool.SetMaxThreads(_maxConcurrencyLevel, ioThreads); 
     } 
     ThreadPool.GetMinThreads(out workerThreads, out ioThreads); 
     if (workerThreads < minimumConcurrency) 
     { 
      if (ioThreads < minimumConcurrency) 
      { 
       ioThreads = minimumConcurrency; 
      } 
      ThreadPool.SetMinThreads(minimumConcurrency, ioThreads); 
     } 
    } 

    /// <summary> 
    /// Implementing TaskScheduler 
    /// </summary> 
    public override int MaximumConcurrencyLevel 
    { 
     get 
     { 
      return _maxConcurrencyLevel; 
     } 
    } 

    /// <summary> 
    /// Scheduler Implementation 
    /// </summary> 
    /// <returns>ScheduledTasks</returns> 
    protected override IEnumerable<Task> GetScheduledTasks() 
    { 
     Task[] tasks; 
     _listLock.EnterReadLock(); 
     try 
     { 
      tasks = _tasks.ToArray(); 
     } 
     finally 
     { 
      _listLock.ExitReadLock(); 
     } 
     return tasks; 
    } 

    /// <summary> 
    /// Queues the specified task 
    /// </summary> 
    /// <param name="task">Task to queue</param> 
    protected override void QueueTask(Task task) 
    { 
     int count; 
     _listLock.EnterReadLock(); 
     try 
     { 
      _tasks.AddLast(task); 
      count = _tasks.Count; 
     } 
     finally 
     { 
      _listLock.ExitReadLock(); 
     } 
     if (count <= _maxConcurrencyLevel) 
     { 
      ThreadPool.UnsafeQueueUserWorkItem(ProcessTask, task); 
     } 
    } 

    /// <summary> 
    /// Scheduler Implementation 
    /// </summary> 
    /// <param name="task">Task to remove</param> 
    /// <returns>Success</returns> 
    protected override bool TryDequeue(Task task) 
    { 
     _listLock.EnterWriteLock(); 
     try 
     { 
      return _tasks.Remove(task); 
     } 
     finally 
     { 
      _listLock.ExitWriteLock(); 
     } 
    } 

    /// <summary> 
    /// Scheduled Implementation 
    /// </summary> 
    /// <param name="task">Task to execute</param> 
    /// <param name="taskWasPreviouslyQueued">Was the task previously queued</param> 
    /// <returns></returns> 
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) 
    { 
     //We're not going to inline slow IO 
     return false; 
    } 

    void ProcessTask(object o) 
    { 
     try 
     { 
      Task t = o as Task; 
      if (t != null) 
      { 
       if (base.TryExecuteTask(t)) 
       { 
        if(!(t.IsCanceled || t.IsFaulted)) t.Wait(); 
        TryDequeue(t); 
       } 
      } 
     } 
     catch(AggregateException a) 
     { 
      var e = a.Flatten(); 
      foreach (Exception ex in e.InnerExceptions) 
      { 
       Debug.WriteLine(ex.ToString()); 
      } 
     } 
    } 
} 
+0

我會先在'Add'方法中刪除'try' /'finally'。捕獲所有異常處理使調試非常困難。這樣做,看看是否有任何東西被拋出。 – Enigmativity 2015-01-26 22:05:42

+0

@Enigmativity'try/finally'並不意味着* Catch-所有的異常*只是爲了確保'_lock.ExitWriteLock();'被執行。 – EZI 2015-01-26 22:10:06

+0

griztown,你用'lock(aSharedObj)_bindingList.Add(item);'替換你的方法時會得到什麼? – EZI 2015-01-26 22:12:43

回答

-1

可能有很多原因。 例如

1)的原點,因此突變特徵filterUserfilterDateIsFiltered() ...不明確看上的代碼,這可能引起問題。

2)通常,代碼是不可擴展的。避免並行訪問(讀取)文件,因爲IO設備(我認爲在您的情況下是硬盤)不是並行讀取設備,並且在簡單串行處理的情況下,很可能會導致性能變差。推薦:將線程關聯設置爲只有2個線程/內核(我再次假設你有更多),並進行調試,以查看會發生什麼。很有可能你會遇到衝突加劇的地步。

+0

@downvoter:原因? – Tigran 2015-01-26 22:14:21

+0

你的文章不是問題的答案。建議:不要試圖回答無法回答的問題。 – 2015-01-27 02:14:11

+0

感謝Tigran.filterUser和filterDate是布爾值,它告訴我們是否要根據關聯用戶或創建日期/時間來過濾文件。 IsFiltered是DebugFile類中的一個標誌,如果文件應該被過濾,該標誌將被設置。我會測試其他建議。 – griztown 2015-01-27 03:41:07