2010-05-28 204 views
0

我開始寫我的第一個並行應用程序。此分區程序將從數據源中一次枚舉IDataReader一次拉動chunkSize記錄。我的GetEnumerator會導致死鎖嗎?

TLDR;版本

private object _Lock = new object(); 
public IEnumerator GetEnumerator() 
{ 
    var infoSource = myInforSource.GetEnumerator(); 
        //Will this cause a deadlock if two threads 
    lock (_Lock) //use the enumator at the same time? 
    { 
     while (infoSource.MoveNext()) 
     { 
      yield return infoSource.Current; 
     } 
    } 
} 

全碼

protected class DataSourcePartitioner<object[]> : System.Collections.Concurrent.Partitioner<object[]> 
{ 
    private readonly System.Data.IDataReader _Input; 
    private readonly int _ChunkSize; 
    public DataSourcePartitioner(System.Data.IDataReader input, int chunkSize = 10000) 
     : base() 
    { 
     if (chunkSize < 1) 
      throw new ArgumentOutOfRangeException("chunkSize"); 
     _Input = input; 
     _ChunkSize = chunkSize; 
    } 

    public override bool SupportsDynamicPartitions { get { return true; } } 

    public override IList<IEnumerator<object[]>> GetPartitions(int partitionCount) 
    { 

     var dynamicPartitions = GetDynamicPartitions(); 
     var partitions = 
      new IEnumerator<object[]>[partitionCount]; 

     for (int i = 0; i < partitionCount; i++) 
     { 
      partitions[i] = dynamicPartitions.GetEnumerator(); 
     } 
     return partitions; 


    } 

    public override IEnumerable<object[]> GetDynamicPartitions() 
    { 
     return new ListDynamicPartitions(_Input, _ChunkSize); 
    } 
    private class ListDynamicPartitions : IEnumerable<object[]> 
    { 
     private System.Data.IDataReader _Input; 
     int _ChunkSize; 
     private object _ChunkLock = new object(); 
     public ListDynamicPartitions(System.Data.IDataReader input, int chunkSize) 
     { 
      _Input = input; 
      _ChunkSize = chunkSize; 
     } 

     public IEnumerator<object[]> GetEnumerator() 
     { 

      while (true) 
      { 
       List<object[]> chunk = new List<object[]>(_ChunkSize); 
       lock(_Input) 
       { 
        for (int i = 0; i < _ChunkSize; ++i) 
        { 
         if (!_Input.Read()) 
          break; 
         var values = new object[_Input.FieldCount]; 
         _Input.GetValues(values); 
         chunk.Add(values); 
        } 
        if (chunk.Count == 0) 
         yield break; 
       } 
       var chunkEnumerator = chunk.GetEnumerator(); 
       lock(_ChunkLock) //Will this cause a deadlock? 
       { 
        while (chunkEnumerator.MoveNext()) 
        { 
         yield return chunkEnumerator.Current; 
        } 
       } 
      } 
     } 

     IEnumerator IEnumerable.GetEnumerator() 
     { 
      return ((IEnumerable<object[]>)this).GetEnumerator(); 
     } 
    } 
} 

我想IEnumerable對象時,它傳遞迴線程安全(在MSDN example是如此我假設PLINQ和TPL可能需要它)將於_ChunkLock附近的鎖底部幫助提供線程安全還是會導致死鎖?從文檔中我無法確定鎖是否會在yeld return上發佈。

此外,如果有內置的.NET功能,將做我想做的事情,我寧願使用它。如果您發現任何其他代碼問題,我將不勝感激。

回答

1

一句話:也許*

如果你總是使用在foreach循環的背景下,這個代碼,那麼你可能不會打僵局(除非它可能是你的myInfoSource是無限的,或者說你的foreach循環有一些代碼它永遠不會終止),儘管你可能會看到減速。

潛在的(實際上,保證的)僵局的一個更可能的原因是這樣的:

var myObject = new YourObject(); 
var enumerator = myObject.GetEnumerator(); 

// if you do this, and then forget about it... 
enumerator.MoveNext(); 

// ...your lock will never be released 

*我是你的基礎代碼的初始塊這個答案。

1

我寫了一個測試框架,它沒有死鎖,但第二個線程永遠不會獲取數據。

static void Main() 
{ 
    En en = new En(); 
    Task.Factory.StartNew(() => 
     { 
      foreach (int i in en) 
      { 
       Thread.Sleep(100); 
       Console.WriteLine("A:" + i.ToString()); 
      } 
     }); 
    Task.Factory.StartNew(() => 
    { 
     foreach (int i in en) 
     { 
      Thread.Sleep(10); 
      Console.WriteLine("B:" +i.ToString()); 
     } 
    }); 
    Console.ReadLine(); 
} 

public class En : IEnumerable 
{ 
    object _lock = new object(); 
    static int i = 0; 
    public IEnumerator GetEnumerator() 
    { 
     lock (_lock) 
     { 
      while (true) 
      { 
       if (i < 10) 
        yield return i++; 
       else 
        yield break; 
      } 
     } 
    } 
} 

返回

A:0 
A:1 
A:2 
A:3 
A:4 
A:5 
A:6 
A:7 
A:8 
A:9 

這裏是一個GetEnumerator更新版本應該正常運行。

public IEnumerator<object[]> GetEnumerator() 
{ 

    while (true) 
    { 
     List<object[]> chunk = new List<object[]>(_ChunkSize); 
     _ChunkPos = 0; 
     lock(_Input) 
     { 
      for (int i = 0; i < _ChunkSize; ++i) 
      { 
       if (!_Input.Read()) 
        break; 
       var values = new object[_Input.FieldCount]; 
       _Input.GetValues(values); 
       chunk.Add(values); 
      } 
      if (chunk.Count == 0) 
       yield break; 
     } 
     var chunkEnumerator = chunk.GetEnumerator(); 
     while (true) 
     { 
      object[] retVal; 
      lock (_ChunkLock) 
      { 
       if (chunkEnumerator.MoveNext()) 
       { 
        retVal = chunkEnumerator.Current; 
       } 
       else 
        break; //break out of chunk while loop. 
      } 
      yield return retVal; 
     } 
    } 
}