當發佈到ActionBlock時,我們看到意外的行爲,即使MaxDegreeOfParallelism爲1,似乎也會發生並行處理。以下是方案。ActionBlock似乎有時會忽略MaxDegreeOfParallelism
該職位的ActionBlock看起來像這樣的類:
public class ByteHandler {
...
public ByteHandler(ByteHandlingCallback delegateCallback){
_byteHandlingActionBlock = new ActionBlock<byte[]>(bytes => delegateCallback.HandleBytes(bytes));
}
public void HandleBytes(byte[] bytes){
_byteHandlingActionBlock.Post(bytes);
}
下游,我們反序列化的字節爲對象,通過這些對象(姑且稱之爲通知),根據不同的類型來處理:
public class NotificationHandler{
private readonly Dictionary<string, AutoResetEvent> _secondNoticeReceivedEvents;
public void HandleInitialNotification(InitialNotification notification){
var secondNoticeReceivedEvent = new AutoResetEvent(false);
if (!_secondNoticeReceivedEvents.TryAdd(notification.ID, secondNoticeReceivedEvent)) return;
DoSomethingDownstream(notification);
if (secondNoticeReceivedEvent.WaitOne(_timeout))
DoSomethingElseDownstream();
else
throw new Exception("Second notification not received within timeout!");
_secondNoticeReceivedEvents.TryRemove(notification.ID, out secondNoticeReceivedEvent);
}
public void HandleSecondNotification(SecondNotification notification){
AutoResetEvent secondNoticeReceivedEvent;
if (_secondNoticeReceivedEvents.TryRemove(notification.ID, out secondNoticeReceivedEvent))
secondNoticeReceivedEvent.Set();
}
該處理程序有一個致命錯誤:InitialNotifications在它們相應的SecondNotifications之前進入,但HandleInitialNotification在退出之前等待HandleSecondNotification,因此線程永遠不會到達HandleSecondNotification。
一般來說,我們看到HandleInitialNotification阻塞,直到等待HandleSecondNotification超時爲止,然後繼續執行在同一個線程上處理未決的SecondNotification。這是我們在日誌中通常看到:
2013-07-05 13:27:25,755 [13] INFO Received InitialNotification for: XX
2013-07-05 13:27:35,758 [13] WARN Second notification not not received for XX within timeout!
2013-07-05 13:27:35,761 [13] INFO Received SecondNotification for: XX
這不是代碼的目的是工作的方式,但由於它是書面的方式,它應該總是超時等待SecondNotification。但是,我們也偶爾看到HandleInitialNotification超時之前完成,與HandleSecondNotification及時被處理在不同的線程:
2013-07-05 13:38:13,232 [56] INFO Received InitialNotification for: YY
2013-07-05 13:38:13,258 [11] INFO Received SecondNotification for: YY
由於我們使用的是默認ActionBlock,該MaxDegreeOfParallelism應爲1。它是如何,那麼當發佈到ActionBlock的原始線程被阻塞時,第二個線程(源自ActionBlock)可以獲取SecondNotification?
因爲所有的抽象層次,我很難理解你的代碼。而且你沒有包含最重要的代碼:'ByteHandlingCallback'的實現。您能儘可能簡化您的代碼,然後發佈一個簡短但完整的示例代碼來說明您的問題嗎? – svick
抱歉沒有回覆;不幸的是,我不再能夠訪問這段代碼來提供ByteHandlingCallback的實現。不過,我會嘗試編輯以實現簡化。 –
我正在運行在同一個問題atm – ekalchev