1
我建立了一個系統,可以無限期地從隊列中讀取消息,然後使用Rx和TPL DataFlow
將它們關閉。使用DataFlow和RX的連續數據流停止處理
出於某種原因,在幾百條消息之後,ActionBlock停止運行掛起,我找不到原因。 this.GetMessages()
繼續着火,但this.ProcessMessages
不再。
var source = Observable
.Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1))
.SelectMany(x => this.GetMessages());
var actionBlock = new ActionBlock<List<QueueStream>>(
this.ProcessMessages,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount * 2,
});
using (source.Subscribe(actionBlock.AsObserver()))
{
while (this.Run)
{
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
actionBlock.Complete();
await actionBlock.Completion;
讀者 - 注意,這實際上繼續運行
private async Task<List<QueueStream>> GetMessages()
{
var messageList = new List<QueueStream>();
var taskList = new List<Task>();
// Add up to N items in our queue
for (var i = 0; i < 25; i++)
{
var task = this
.ReadAndParseQueue()
.ContinueWith(async queueStreamTask =>
{
var queueStream = await queueStreamTask;
if (queueStream != null)
{
messageList.Add(queueStream);
}
});
taskList.Add(task);
}
await Task.WhenAll(taskList);
return messageList;
}
作家 - 幾百個信息此停止擊中
private async Task ProcessMessages(List<QueueStream> streams)
{
var tasks = new List<Task>();
foreach (var queueStream in streams)
{
tasks.Add(this.ProcessMessage(queueStream));
}
await Task.WhenAll(tasks);
}
你確實需要提供[mcve]。我們可以複製粘貼並運行重現此問題的代碼。 – Enigmativity
@Chris一旦它出現,ActionBlock的狀態是什麼,它可能是有故障的。在'等待'完成之前,從'ProcessMessages'中拋出的任何異常都不會被觀察到,如果總是設置this.run,那麼在你的情況下永遠不會發生。 – JSteward