我正在學習TPL Dataflow
圖書館。到目前爲止,這正是我所期待的。Parallel.For SendAsync緩衝區塊到異步轉換?
我創建了一個簡單的類(下文)執行以下功能
- 後的
ImportPropertiesForBranch
執行我去一個第三方的API,並得到 - ,則返回XML列表屬性的列表並反序列化成屬性數據數組(id,api endpoint,lastupdated)。有大約400多個房產(如房屋)。
- 我然後用
Parallel.For
來SendAsync
屬性數據到我propertyBufferBlock
- 的
propertyBufferBlock
鏈接到propertyXmlBlock
(這本身就是TransformBlock
)。 propertyXmlBlock
然後(異步)返回到API(使用屬性數據中提供的api端點)並獲取屬性xml進行反序列化。- 一旦xml返回並變爲可用,我們可以反序列化
- 稍後,我將添加更多
TransformBlock
以將它保存到數據存儲。
所以我的問題是;
- 是否有任何潛在的瓶頸或代碼區域可能會很麻煩?我知道我沒有包含任何錯誤處理或取消(這是未來)。
- 是不是
await
異步調用TransformBlock
或這是一個 的瓶頸? - 儘管代碼有效,但我擔心
Parallel.For
,BufferBlock
和TransformBlock
中的異步緩衝和異步。我不確定它是最好的方式,我可能會混淆一些概念。
歡迎任何指導,改進和陷阱建議。
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using My.Interfaces;
using My.XmlService.Models;
namespace My.ImportService
{
public class ImportService
{
private readonly IApiService _apiService;
private readonly IXmlService _xmlService;
private readonly IRepositoryService _repositoryService;
public ImportService(IApiService apiService,
IXmlService xmlService,
IRepositoryService repositoryService)
{
_apiService = apiService;
_xmlService = xmlService;
_repositoryService = repositoryService;
ConstructPipeline();
}
private BufferBlock<propertiesProperty> propertyBufferBlock;
private TransformBlock<propertiesProperty, string> propertyXmlBlock;
private TransformBlock<string, propertyType> propertyDeserializeBlock;
private ActionBlock<propertyType> propertyCompleteBlock;
public async Task<bool> ImportPropertiesForBranch(string branchName, int branchUrlId)
{
var propertyListXml = await _apiService.GetPropertyListAsync(branchUrlId);
if (string.IsNullOrEmpty(propertyListXml))
return false;
var properties = _xmlService.DeserializePropertyList(propertyListXml);
if (properties?.property == null || properties.property.Length == 0)
return false;
// limited to the first 20 for testing
Parallel.For(0, 20,
new ParallelOptions {MaxDegreeOfParallelism = 3},
i => propertyBufferBlock.SendAsync(properties.property[i]));
propertyBufferBlock.Complete();
await propertyCompleteBlock.Completion;
return true;
}
private void ConstructPipeline()
{
propertyBufferBlock = GetPropertyBuffer();
propertyXmlBlock = GetPropertyXmlBlock();
propertyDeserializeBlock = GetPropertyDeserializeBlock();
propertyCompleteBlock = GetPropertyCompleteBlock();
propertyBufferBlock.LinkTo(
propertyXmlBlock,
new DataflowLinkOptions {PropagateCompletion = true});
propertyXmlBlock.LinkTo(
propertyDeserializeBlock,
new DataflowLinkOptions {PropagateCompletion = true});
propertyDeserializeBlock.LinkTo(
propertyCompleteBlock,
new DataflowLinkOptions {PropagateCompletion = true});
}
private BufferBlock<propertiesProperty> GetPropertyBuffer()
{
return new BufferBlock<propertiesProperty>();
}
private TransformBlock<propertiesProperty, string> GetPropertyXmlBlock()
{
return new TransformBlock<propertiesProperty, string>(async propertiesProperty =>
{
Debug.WriteLine($"getting xml {propertiesProperty.prop_id}");
var propertyXml = await _apiService.GetXmlAsStringAsync(propertiesProperty.url);
return propertyXml;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
private TransformBlock<string, propertyType> GetPropertyDeserializeBlock()
{
return new TransformBlock<string, propertyType>(xmlAsString =>
{
Debug.WriteLine($"deserializing");
var propertyType = _xmlService.DeserializeProperty(xmlAsString);
return propertyType;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
private ActionBlock<propertyType> GetPropertyCompleteBlock()
{
return new ActionBlock<propertyType>(propertyType =>
{
Debug.WriteLine($"complete {propertyType.id}");
Debug.WriteLine(propertyType.address.display);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
}
}
謝謝你的回答。很高興知道我在正確的軌道上。經過一些實驗後,我已經爲'foreach'刪除了'Parallel.For',現在只是'propertyBufferBlock.Post(property);'。我也提高了'MaxDegreeOfParallelism'和'BoundedCapacity'的效果。我相信我唯一的限制是現在的第三方Api速度。 – madebybear
@madebybear,我會建議不要使用'bufferBlock.Post',因爲這是阻塞方法,只有當該項目被接受處理時纔會返回。我寧願建議使用'bufferBlock.SendAsync'並等待它在一個循環中。它將釋放一個線程並增加應用程序的整體吞吐量。 –
謝謝你指出這一點。我已經將代碼更改爲一個簡單的'foreach',並在循環內部用'await bufferBlock.SendAsync(property)'指向緩衝區。 – madebybear