2017-04-20 37 views
1

我建立了一個系統,可以無限期地從隊列中讀取消息,然後使用Rx和TPL DataFlow將它們關閉。使用DataFlow和RX的連續數據流停止處理

出於某種原因,在幾百條消息之後,ActionBlock停止運行掛起,我找不到原因。 this.GetMessages()繼續着火,但this.ProcessMessages不再。

var source = Observable 
    .Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1)) 
    .SelectMany(x => this.GetMessages()); 

var actionBlock = new ActionBlock<List<QueueStream>>(
    this.ProcessMessages, 
    new ExecutionDataflowBlockOptions 
    { 
     MaxDegreeOfParallelism = Environment.ProcessorCount * 2, 
    }); 

using (source.Subscribe(actionBlock.AsObserver())) 
{ 
    while (this.Run) 
    { 
     await Task.Delay(TimeSpan.FromSeconds(1)); 
    } 
} 

actionBlock.Complete(); 
await actionBlock.Completion; 

讀者 - 注意,這實際上繼續運行

private async Task<List<QueueStream>> GetMessages() 
{ 
    var messageList = new List<QueueStream>(); 
    var taskList = new List<Task>(); 

    // Add up to N items in our queue 
    for (var i = 0; i < 25; i++) 
    { 
     var task = this 
      .ReadAndParseQueue() 
      .ContinueWith(async queueStreamTask => 
       { 
        var queueStream = await queueStreamTask; 
        if (queueStream != null) 
        { 
         messageList.Add(queueStream); 
        } 
       }); 

     taskList.Add(task); 
    } 

    await Task.WhenAll(taskList); 

    return messageList; 
} 

作家 - 幾百個信息此停止擊中

private async Task ProcessMessages(List<QueueStream> streams) 
{ 
    var tasks = new List<Task>(); 
    foreach (var queueStream in streams) 
    { 
     tasks.Add(this.ProcessMessage(queueStream)); 
    } 

    await Task.WhenAll(tasks); 
} 
+0

你確實需要提供[mcve]。我們可以複製粘貼並運行重現此問題的代碼。 – Enigmativity

+0

@Chris一旦它出現,ActionBlock的狀態是什麼,它可能是有故障的。在'等待'完成之前,從'ProcessMessages'中拋出的任何異常都不會被觀察到,如果總是設置this.run,那麼在你的情況下永遠不會發生。 – JSteward

回答

1

確定後你的source繼續在這種情況下運行?有一個在你的代碼中的無限循環,但是,如果發生錯誤或this.Run沒有設置,它會停下來,之後,你有這些行:

actionBlock.Complete(); 
await actionBlock.Completion; 

這實際上阻止actionBlock接受新mesasges,所以ProcessMessages將永遠不會被調用,因爲消息只是被忽略。