1

我是TPL Dataflow的新手。我有一個需要處理的項目編號列表。一個項目可能有大約8000項目,我需要獲取項目中每個項目的數據,然後將這些數據推送到5個獨立的服務器中。TPL數據流 - 如何調用行動項目多個項目

這是我到目前爲止編碼。我一直堅持如何將這些數據加載到5臺服務器的步驟。我不確定這是否正確編碼。任何意見非常感謝。

public static bool PushData(string projectId) 
{ 
    var linkCompletion = new DataflowLinkOptions 
    { 
     PropagateCompletion = true 
    }; 

    var projectItems = new TransformBlock<ProjectDTO, ProjectDTO>(
     dto => dto.GetItemData(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 }); 

    var itemData = new ActionBlock<ProjectDTO>(
     dto => PostEachServerAsync(dto, "server1", "setmemcache"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 }); 


    projectItems.LinkTo(projectRules, linkCompletion); 

    IList<ProjectDTO> dtoList = new List<ProjectDTO>(); 
    dtoList = MemcachedDTO.GetDataByProject(projectId); 

    foreach (ProjectDTOd in dtoList) 
    { 
     projectItems.Post(d); 
    } 

    projectItems.Complete(); 
    projectItems.Completion.Wait(); 
    return false; 
} 

這是我的代碼現在 - 但它不能正確完成 - 誰能告訴我我做錯了什麼?

   [HttpGet] 
    public HttpResponseMessage ReloadItem(string projectQuery) 
    { 
     try 
     { 

      var linkCompletion = new DataflowLinkOptions 
      { 
       PropagateCompletion = true 
      }; 

      IList<string> projectIds = projectQuery.Split(',').ToList(); 
      IEnumerable<string> serverList = ConfigurationManager.AppSettings["ServerList"].Split(',').Cast<string>(); 

      var iR = new TransformBlock<MemcachedDTO, MemcachedDTO>(
       dto => dto.GetIR(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 }); 

      var pR = serverList.Select(
        i => new { Id = i, Action = new ActionBlock<MemcachedDTO>(dto => PostEachServerAsync(dto, i, "set"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 }) }); 

      List<MemcachedDTO> dtoList = new List<MemcachedDTO>(); 

      foreach (string pid in projectIds) 
      { 
       IList<MemcachedDTO> dtoTemp = new List<MemcachedDTO>(); 
       dtoTemp = MemcachedDTO.GetItemIdsByProject(pid); 
       dtoList.AddRange(dtoTemp); 
      } 


      foreach (var action in pR) 
      { 
       iR.LinkTo(action.Action, linkCompletion); 
      } 

      foreach (MemcachedDTO d in dtoList) 
      { 
       iR.Post(d); 
      } 
      iR.Complete(); 
      foreach (var action in pR) 
      { 
       action.Action.Completion.Wait(); 
      } 


      return Request.CreateResponse(HttpStatusCode.OK, new { message = projectIds.ToString() + " reload success" }); 
     } 
     catch (Exception ex) 
     { 
      return Request.CreateResponse(HttpStatusCode.InternalServerError, new { message = ex.Message.ToString() }); 
     } 
    } 
+0

如果從您的郵件的1000,你只拿到了998,做檢查的錯誤,也有例外你不知道 – VMAtm

回答

1

你的代碼根本不編譯,你如何運行它?

首先,不要用.Wait()阻止你的線程,在這裏使用async/await pattern。其次,你需要一個BroadcastBlock來通知你的數據超過1個塊。第三,你需要5個不同的ActionBlock s,而不是1和平行度5.第四,你等待錯誤Completion任務 - 等待最後一個塊完成,而不是第一個,所以在你的情況下,你需要等待5塊用WhenAll method完成。

所以,你的代碼可能是這樣的(我假設projectRulesitemsData是相同的塊):

​​
+0

謝謝你的回覆 - 項目項目與itemdata不在同一個塊中 - 這是因爲1)我將首先獲得與每個項目編號對應的列表項目2)獲取項目基於來自步驟1的項目ID的數據)。之後,我需要將這些數據推送到5臺服務器。我有點失落,我可以迭代項目ID來獲得項目ID,然後是與每個項目ID相對應的數據。 – klkj898

+0

這裏是我的代碼 - 但它沒有正確完成 - 我做錯了什麼? – klkj898

+0

1.您沒有使用BroadcastBlock,因此只有第一個塊纔會收到消息。 2.你用'Wait'阻止你的線程,這很糟糕,使用'async'await'。 3. SO不適合爲您編譯代碼。你明白了,你做到了。 – VMAtm