2013-02-26 99 views
2

該場景如下: 有一些低優先級線程可以被高優先級線程中斷。每當高優先級線程要求低優先級線程暫停時,它們將進入Wait狀態(如果它們尚未處於等待狀態)。然而,當高優先級的線程表示低優先級線程可以通知Resume時,低優先級線程不應該繼續,直到要求低優先級線程暫停的所有高優先級線程已經同意爲止。管理.net中的高/低優先級線程

爲了解決這個問題,我保持跟蹤Pause()從高優先級線程調用計數器變量中的低優先級線程。每當高優先級線程向低優先級線程詢問Pause()時,計數器的值增加1.如果在增量後計數器的值爲1,則表示該線程不在Wait中,因此請求它進入Wait狀態。否則,只需增加counter值。相反,當一個高優先級的線程調用Resume()時,我們遞減counter的值,並且如果在遞減之後的值是0,這意味着低優先級的線程現在可以Resume

這是我的問題的簡化實現。比較操作內如果與Interlocked.XXX語句是不正確的,即

如果(Interlocked.Increment(參照_remain)== 1)

,作爲讀/修改和比較操作不是原子。

我在這裏錯過了什麼?我不想使用線程優先級。

using System; 
using System.Collections.Generic; 
using System.Threading; 

namespace TestConcurrency 
{ 

// I borrowed this class from Joe Duffy's blog and modified it 
public class LatchCounter 
{ 
private long _remain; 
private EventWaitHandle m_event; 
private readonly object _lockObject; 

public LatchCounter() 
{ 
    _remain = 0; 
    m_event = new ManualResetEvent(true); 
    _lockObject = new object(); 
} 

public void Check() 
{ 
    if (Interlocked.Read(ref _remain) > 0) 
    { 
     m_event.WaitOne(); 
    } 
} 

public void Increment() 
{ 
    lock(_lockObject) 
    { 
     if (Interlocked.Increment(ref _remain) == 1) 
      m_event.Reset(); 
    } 
} 

public void Decrement() 
{ 
    lock(_lockObject) 
    { 
     // The last thread to signal also sets the event. 
     if (Interlocked.Decrement(ref _remain) == 0) 
      m_event.Set(); 
    } 
} 
} 



public class LowPriorityThreads 
{ 
private List<Thread> _threads; 
private LatchCounter _latch; 
private int _threadCount = 1; 

internal LowPriorityThreads(int threadCount) 
{ 
    _threadCount = threadCount; 
    _threads = new List<Thread>(); 
    for (int i = 0; i < _threadCount; i++) 
    { 
     _threads.Add(new Thread(ThreadProc)); 
    } 

    _latch = new CountdownLatch(); 
} 


public void Start() 
{ 
    foreach (Thread t in _threads) 
    { 
     t.Start(); 
    } 
} 

void ThreadProc() 
{ 
    while (true) 
    { 
     //Do something 
     Thread.Sleep(Rand.Next()); 
     _latch.Check(); 
    } 
} 

internal void Pause() 
{ 
    _latch.Increment(); 
} 

internal void Resume() 
{ 
    _latch.Decrement(); 
} 
} 


public class HighPriorityThreads 
{ 
private Thread _thread; 
private LowPriorityThreads _lowPriorityThreads; 

internal HighPriorityThreads(LowPriorityThreads lowPriorityThreads) 
{ 
    _lowPriorityThreads = lowPriorityThreads; 
    _thread = new Thread(RandomlyInterruptLowPriortyThreads); 
} 

public void Start() 
{ 
    _thread.Start(); 
} 

void RandomlyInterruptLowPriortyThreads() 
{ 
    while (true) 
    { 
     Thread.Sleep(Rand.Next()); 

     _lowPriorityThreads.Pause(); 

     Thread.Sleep(Rand.Next()); 
     _lowPriorityThreads.Resume(); 
    } 
} 
} 

class Program 
{ 
    static void Main(string[] args) 
    { 
    LowPriorityThreads lowPriorityThreads = new LowPriorityThreads(3); 
    HighPriorityThreads highPriorityThreadOne = new HighPriorityThreads(lowPriorityThreads); 
    HighPriorityThreads highPriorityThreadTwo = new HighPriorityThreads(lowPriorityThreads); 

    lowPriorityThreads.Start(); 
    highPriorityThreadOne.Start(); 
    highPriorityThreadTwo.Start(); 
} 
} 


class Rand 
{ 
internal static int Next() 
{ 
    // Guid idea has been borrowed from somewhere on StackOverFlow coz I like it 
    return new System.Random(Guid.NewGuid().GetHashCode()).Next() % 30000; 
} 
} 
+0

爲什麼你就不能修改檢查做'm_event.WaitOne()'沒有別的什麼嗎? – usr 2013-02-26 12:14:10

+2

不使用Thread.Priority是一個嚴重的錯誤。你將調試僵局,直到母牛回家。 – 2013-02-26 12:19:48

+0

有些東西對這個要求聞起來有些「異味」,但我現在還不能完全掌握它。 – 2013-02-26 12:40:32

回答

0

我不知道您的要求,因此我不會在這裏討論它們。 就實現過程而言,我會引入一個「調度程序」類,它將處理線程間交互,並且還作爲「可運行」對象的工廠。

這個實現當然是非常粗糙和開放的批評。

class Program 
{ 
    static void Main(string[] args) 
    { 
     ThreadDispatcher td=new ThreadDispatcher(); 
     Runner r1 = td.CreateHpThread(d=>OnHpThreadRun(d,1)); 
     Runner r2 = td.CreateHpThread(d => OnHpThreadRun(d, 2)); 

     Runner l1 = td.CreateLpThread(d => Console.WriteLine("Running low priority thread 1")); 
     Runner l2 = td.CreateLpThread(d => Console.WriteLine("Running low priority thread 2")); 
     Runner l3 = td.CreateLpThread(d => Console.WriteLine("Running low priority thread 3")); 


     l1.Start(); 
     l2.Start(); 
     l3.Start(); 

     r1.Start(); 
     r2.Start(); 

     Console.ReadLine(); 

     l1.Stop(); 
     l2.Stop(); 
     l3.Stop(); 

     r1.Stop(); 
     r2.Stop(); 
    } 

    private static void OnHpThreadRun(ThreadDispatcher d,int number) 
    { 
     Random r=new Random(); 
     Thread.Sleep(r.Next(100,2000)); 
     d.CheckedIn(); 
     Console.WriteLine(string.Format("*** Starting High Priority Thread {0} ***",number)); 
     Thread.Sleep(r.Next(100, 2000)); 
     Console.WriteLine(string.Format("+++ Finishing High Priority Thread {0} +++", number)); 
     Thread.Sleep(300); 
     d.CheckedOut();   
    } 
} 

public abstract class Runner 
{ 
    private Thread _thread; 
    protected readonly Action<ThreadDispatcher> _action; 
    private readonly ThreadDispatcher _dispathcer; 
    private long _running; 
    readonly ManualResetEvent _stopEvent=new ManualResetEvent(false); 
    protected Runner(Action<ThreadDispatcher> action,ThreadDispatcher dispathcer) 
    { 
     _action = action; 
     _dispathcer = dispathcer; 
    } 

    public void Start() 
    { 
     _thread = new Thread(OnThreadStart); 
     _running = 1; 
     _thread.Start(); 
    } 

    public void Stop() 
    { 
     _stopEvent.Reset(); 
     Interlocked.Exchange(ref _running, 0); 
     _stopEvent.WaitOne(2000); 
     _thread = null; 
     Console.WriteLine("The thread has been stopped."); 

    } 
    protected virtual void OnThreadStart() 
    { 
     while (Interlocked.Read(ref _running)!=0) 
     { 
      OnStartWork(); 
      _action.Invoke(_dispathcer); 
      OnFinishWork(); 
     } 
     OnFinishWork(); 
     _stopEvent.Set(); 
    } 

    protected abstract void OnStartWork(); 
    protected abstract void OnFinishWork(); 
} 

public class ThreadDispatcher 
{ 
    private readonly ManualResetEvent _signal=new ManualResetEvent(true); 
    private int _hpCheckedInThreads; 
    private readonly object _lockObject = new object(); 

    public void CheckedIn() 
    { 
     lock(_lockObject) 
     { 
      _hpCheckedInThreads++; 
      _signal.Reset(); 
     } 
    } 
    public void CheckedOut() 
    { 
     lock(_lockObject) 
     { 
      if(_hpCheckedInThreads>0) 
       _hpCheckedInThreads--; 
      if (_hpCheckedInThreads == 0) 
       _signal.Set(); 
     } 
    } 

    private class HighPriorityThread:Runner 
    { 
     public HighPriorityThread(Action<ThreadDispatcher> action, ThreadDispatcher dispatcher) : base(action,dispatcher) 
     { 
     } 

     protected override void OnStartWork() 
     { 
     } 

     protected override void OnFinishWork() 
     { 
     } 
    } 
    private class LowPriorityRunner:Runner 
    { 
     private readonly ThreadDispatcher _dispatcher; 
     public LowPriorityRunner(Action<ThreadDispatcher> action, ThreadDispatcher dispatcher) 
      : base(action, dispatcher) 
     { 
      _dispatcher = dispatcher; 
     } 

     protected override void OnStartWork() 
     { 
      Console.WriteLine("LP Thread is waiting for a signal."); 
      _dispatcher._signal.WaitOne(); 
      Console.WriteLine("LP Thread got the signal."); 
     } 

     protected override void OnFinishWork() 
     { 

     } 
    } 

    public Runner CreateLpThread(Action<ThreadDispatcher> action) 
    { 
     return new LowPriorityRunner(action, this); 
    } 

    public Runner CreateHpThread(Action<ThreadDispatcher> action) 
    { 
     return new HighPriorityThread(action, this); 
    } 
} 

}

相關問題