2

我正在學習TPL Dataflow圖書館。到目前爲止,這正是我所期待的。Parallel.For SendAsync緩衝區塊到異步轉換?

我創建了一個簡單的類(下文)執行以下功能

  • 後的ImportPropertiesForBranch執行我去一個第三方的API,並得到
  • ,則返回XML列表屬性的列表並反序列化成屬性數據數組(id,api endpoint,lastupdated)。有大約400多個房產(如房屋)。
  • 我然後用Parallel.ForSendAsync屬性數據到我propertyBufferBlock
  • propertyBufferBlock鏈接到propertyXmlBlock(這本身就是TransformBlock)。
  • propertyXmlBlock然後(異步)返回到API(使用屬性數據中提供的api端點)並獲取屬性xml進行反序列化。
  • 一旦xml​​返回並變爲可用,我們可以反序列化
  • 稍後,我將添加更多TransformBlock以將它保存到數據存儲。

所以我的問題是;

  • 是否有任何潛在的瓶頸或代碼區域可能會很麻煩?我知道我沒有包含任何錯誤處理或取消(這是未來)。
  • 是不是await異步調用TransformBlock或這是一個 的瓶頸?
  • 儘管代碼有效,但我擔心Parallel.For,BufferBlockTransformBlock中的異步緩衝和異步。我不確定它是最好的方式,我可能會混淆一些概念。

歡迎任何指導,改進和陷阱建議。

using System.Diagnostics; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 
using My.Interfaces; 
using My.XmlService.Models; 

namespace My.ImportService 
{ 
    public class ImportService 
    { 

     private readonly IApiService _apiService; 
     private readonly IXmlService _xmlService; 
     private readonly IRepositoryService _repositoryService; 

     public ImportService(IApiService apiService, 
      IXmlService xmlService, 
      IRepositoryService repositoryService) 
     { 
      _apiService = apiService; 
      _xmlService = xmlService; 
      _repositoryService = repositoryService; 

      ConstructPipeline(); 
     } 

     private BufferBlock<propertiesProperty> propertyBufferBlock; 
     private TransformBlock<propertiesProperty, string> propertyXmlBlock; 
     private TransformBlock<string, propertyType> propertyDeserializeBlock; 
     private ActionBlock<propertyType> propertyCompleteBlock; 

     public async Task<bool> ImportPropertiesForBranch(string branchName, int branchUrlId) 
     { 
      var propertyListXml = await _apiService.GetPropertyListAsync(branchUrlId); 

      if (string.IsNullOrEmpty(propertyListXml)) 
       return false; 

      var properties = _xmlService.DeserializePropertyList(propertyListXml); 

      if (properties?.property == null || properties.property.Length == 0) 
       return false; 

      // limited to the first 20 for testing 
      Parallel.For(0, 20, 
       new ParallelOptions {MaxDegreeOfParallelism = 3}, 
       i => propertyBufferBlock.SendAsync(properties.property[i])); 

      propertyBufferBlock.Complete(); 

      await propertyCompleteBlock.Completion; 

      return true; 
     } 

     private void ConstructPipeline() 
     { 
      propertyBufferBlock = GetPropertyBuffer(); 
      propertyXmlBlock = GetPropertyXmlBlock(); 
      propertyDeserializeBlock = GetPropertyDeserializeBlock(); 
      propertyCompleteBlock = GetPropertyCompleteBlock(); 

      propertyBufferBlock.LinkTo(
       propertyXmlBlock, 
       new DataflowLinkOptions {PropagateCompletion = true}); 

      propertyXmlBlock.LinkTo(
       propertyDeserializeBlock, 
       new DataflowLinkOptions {PropagateCompletion = true}); 

      propertyDeserializeBlock.LinkTo(
       propertyCompleteBlock, 
       new DataflowLinkOptions {PropagateCompletion = true}); 
     } 

     private BufferBlock<propertiesProperty> GetPropertyBuffer() 
     { 
      return new BufferBlock<propertiesProperty>(); 
     } 

     private TransformBlock<propertiesProperty, string> GetPropertyXmlBlock() 
     { 
      return new TransformBlock<propertiesProperty, string>(async propertiesProperty => 
       { 
        Debug.WriteLine($"getting xml {propertiesProperty.prop_id}"); 
        var propertyXml = await _apiService.GetXmlAsStringAsync(propertiesProperty.url); 
        return propertyXml; 
       }, 
       new ExecutionDataflowBlockOptions 
       { 
        MaxDegreeOfParallelism = 1, 
        BoundedCapacity = 2 
       }); 
     } 

     private TransformBlock<string, propertyType> GetPropertyDeserializeBlock() 
     { 
      return new TransformBlock<string, propertyType>(xmlAsString => 
       { 
        Debug.WriteLine($"deserializing"); 
        var propertyType = _xmlService.DeserializeProperty(xmlAsString); 
        return propertyType; 
       }, 
       new ExecutionDataflowBlockOptions 
       { 
        MaxDegreeOfParallelism = 1, 
        BoundedCapacity = 2 
       }); 
     } 

     private ActionBlock<propertyType> GetPropertyCompleteBlock() 
     { 
      return new ActionBlock<propertyType>(propertyType => 
       { 
        Debug.WriteLine($"complete {propertyType.id}"); 
        Debug.WriteLine(propertyType.address.display); 
       }, 
       new ExecutionDataflowBlockOptions 
       { 
        MaxDegreeOfParallelism = 1, 
        BoundedCapacity = 2 
       }); 
     } 
    } 
} 

回答

1

是否有代碼,可能是麻煩的任何潛在的瓶頸或區域?

一般而言,您的方法看起來不錯,並且潛在的瓶頸是您正在限制與MaxDegreeOfParallelism = 1並行處理塊。根據問題的描述,每個項目可以獨立處理,這就是爲什麼您一次可以處理多個項目的原因。

可以等待TransformBlock內部的異步呼叫,或者這是一個瓶頸嗎?

這很好,因爲TPL DataFlow支持異步操作。

雖然代碼工作,我擔心的Parallel.ForBufferBlockTransformBlock緩衝和asyncronsity和異步。我不確定它是最好的方式,我可能會混淆一些概念。

之一,在你的代碼中潛在的問題,可以使你搬起石頭砸自己的腳被調用Parallel.For異步方法,然後調用propertyBufferBlock.Complete();。這裏的問題是Parallel.For不支持異步操作,您調用它的方式將調用propertyBufferBlock.SendAsync並在返回的任務完成之前繼續。這意味着在Parallel.For退出時,某些操作可能仍處於運行狀態,並且項目尚未添加到緩衝區塊中。如果您接着致電propertyBufferBlock.Complete();那些待處理項目將拋出異常並且項目不會被添加到處理中。你會得到不可觀察的例外。

您可以使用ForEachAsync表格this blog post確保在完成塊之前將所有項目添加到塊中。但是,如果您仍然將處理限制爲1次操作,則可以一次添加一個項目。我不知道如何實現propertyBufferBlock.SendAsync,但它可能會在內部限制一次添加一個項目,因此並行添加沒有任何意義。

+1

謝謝你的回答。很高興知道我在正確的軌道上。經過一些實驗後,我已經爲'foreach'刪除了'Parallel.For',現在只是'propertyBufferBlock.Post(property);'。我也提高了'MaxDegreeOfParallelism'和'BoundedCapacity'的效果。我相信我唯一的限制是現在的第三方Api速度。 – madebybear

+1

@madebybear,我會建議不要使用'bufferBlock.Post',因爲這是阻塞方法,只有當該項目被接受處理時纔會返回。我寧願建議使用'bufferBlock.SendAsync'並等待它在一個循環中。它將釋放一個線程並增加應用程序的整體吞吐量。 –

+1

謝謝你指出這一點。我已經將代碼更改爲一個簡單的'foreach',並在循環內部用'await bufferBlock.SendAsync(property)'指向緩衝區。 – madebybear

2

你做一些實際的東西,在一個錯誤的方式:

i => propertyBufferBlock.SendAsync(properties.property[i]) 

您需要await方法,否則你創建了太多的同時任務。

而且這條線:

MaxDegreeOfParallelism = 1 

會限制你塊隨之執行,這會降低你的表現的執行。

正如在評論說,你切換到同步方法Post並通過設置BoundedCapacity限於塊的容量。這個變體應該謹慎使用,因爲你需要檢查它的返回值,說明消息已被接受與否。

至於你在等待塊內部async方法的擔心 - 這絕對沒問題,應該像在其他情況下一樣完成async方法的使用。

+0

謝謝你的迴應。 @ andrii-litvinov指出了這一點,並且我已經將一個簡單的'foreach'與循環內的'await sendAsync'切換到緩衝區。 – madebybear