我有我需要處理的項目編號列表。一個項目可能有大約8000個項目,我需要獲取項目中每個項目的數據,然後將這些數據推送到服務器列表中。任何人都可以告訴我以下...BroadcastBlock缺失項目
1)我有1000個項目在iR但只有998被寫入服務器。通過使用broadCastBlock,我的物品是否鬆動? 2)我是否正確地等待所有actionBlocks? 3)如何使數據庫調用異步?
這裏是數據庫代碼
public MemcachedDTO GetIR(MemcachedDTO dtoItem)
{
string[] Tables = new string[] { "iowa", "la" };
using (SqlConnection connection = new SqlConnection(ConfigurationManager.ConnectionStrings["test"].ConnectionString))
{
using (SqlCommand command = new SqlCommand("test", connection))
{
DataSet Result = new DataSet();
command.CommandType = CommandType.StoredProcedure;
command.Parameters.Add("@ProjectId", SqlDbType.VarChar);
command.Parameters["@ProjectId"].Value = dtoItem.ProjectId;
connection.Open();
Result.EnforceConstraints = false;
Result.Load(command.ExecuteReader(CommandBehavior.CloseConnection), LoadOption.OverwriteChanges, Tables);
dtoItem.test = Result;
}
}
return dtoItem;
}
更新: 我的代碼更新到下面。它只是在我運行它時掛起,只寫入1/4的數據到服務器?你能讓我知道我做錯了什麼嗎?
public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(IEnumerable<ITargetBlock<T>> targets, DataflowBlockOptions options)
{
var targetsList = targets.ToList();
var block = new ActionBlock<T>(
async item =>
{
foreach (var target in targetsList)
{
await target.SendAsync(item);
}
}, new ExecutionDataflowBlockOptions
{
CancellationToken = options.CancellationToken
});
block.Completion.ContinueWith(task =>
{
foreach (var target in targetsList)
{
if (task.Exception != null)
target.Fault(task.Exception);
else
target.Complete();
}
});
return block;
}
[HttpGet]
public async Task< HttpResponseMessage> ReloadItem(string projectQuery)
{
try
{
var linkCompletion = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 2
};
var cts = new CancellationTokenSource();
var dbOptions = new DataflowBlockOptions { CancellationToken = cts.Token };
IList<string> projectIds = projectQuery.Split(',').ToList();
IEnumerable<string> serverList = ConfigurationManager.AppSettings["ServerList"].Split(',').Cast<string>();
var iR = new TransformBlock<MemcachedDTO, MemcachedDTO>(
dto => dto.GetIR(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 });
List<ActionBlock<MemcachedDTO>> actionList = new List<ActionBlock<MemcachedDTO>>();
List<MemcachedDTO> dtoList = new List<MemcachedDTO>();
foreach (string pid in projectIds)
{
IList<MemcachedDTO> dtoTemp = new List<MemcachedDTO>();
dtoTemp = MemcachedDTO.GetItemIdsByProject(pid);
dtoList.AddRange(dtoTemp);
}
foreach (string s in serverList)
{
var action = new ActionBlock<MemcachedDTO>(
async dto => await PostEachServerAsync(dto, s, "setitemcache"));
actionList.Add(action);
}
var bBlock = CreateGuaranteedBroadcastBlock(actionList, dbOptions);
foreach (MemcachedDTO d in dtoList)
{
await iR.SendAsync(d);
}
iR.Complete();
iR.LinkTo(bBlock);
await Task.WhenAll(actionList.Select(action => action.Completion).ToList());
return Request.CreateResponse(HttpStatusCode.OK, new { message = projectIds.ToString() + " reload success" });
}
catch (Exception ex)
{
return Request.CreateResponse(HttpStatusCode.InternalServerError, new { message = ex.Message.ToString() });
}
}
感謝您的詳細回覆@JSteward。我已經在上面的帖子中更新了我的代碼,並且仍然有問題...應用程序沒有完成(我刪除了PropagateCompletion),只寫了1/4的記錄。你能否指點我正確的方向?謝謝 – klkj898
跳出來的第一件事是:在將所有數據發送到流中之前,您沒有將「TransformBlock」鏈接到「BroadcastBlock」。當你修復那部分會發生什麼? – JSteward
另一種可能性是'PostEachServerAsync'未被等待,如果它正在運行'async',那麼它將被視爲火併遺忘,並且可能無法在流程完成時完成。 – JSteward