2014-04-11 92 views
0

我正在寫一些東西,從SQL服務器加載到azure隊列的記錄。問題是,選擇結果中的項目數量可能非常大,因此我想在數據仍在檢索時開始排隊。如何做EF異步'分頁'處理選擇結果

我試圖利用EF6(異步),所有異步方法和和TPL並行排隊。所以,我有:

 // This defines queue that Generator will publsh to and 
     // QueueManager wil read from. More info: 
     // http://msdn.microsoft.com/en-us/library/hh228601(v=vs.110).aspx 
     var queue = new BufferBlock<ProcessQueueItem>(); 

     // Configure queue listener first 
     var result = this.ReceiveAndEnqueue(queue); 

     // Start generation process 
     var tasks = generator.Generate(batchId); 

的ReceiveAndEnqueue很簡單:

private async Task ReceiveAndEnqueue(ISourceBlock<ProcessQueueItem> queue) 
    { 
     while (await queue.OutputAvailableAsync()) 
     { 
      var processQueueItem = await queue.ReceiveAsync(); 
      await this.queueManager.Enqueue(processQueueItem); 
      this.tasksEnqueued++; 
     } 
    } 

發生器產生()的簽名如下:

public void Generate(Guid someId, ITargetBlock<ProcessQueueItem> target) 

目標後者調用SendAsync()方法放置新物品。我在做什麼,現在被劃分結果的總數爲「批次」,在加載它們,並異步送他們,直到全部完成:

public void Generate(Guid batchId, ITargetBlock<ProcessQueueItem> target) 
    { 
     var accountPromise = this.AccountStatusRepository.GetAccountsByBatchId(batchId.ToString()); 
     accountPromise.Wait(); 
     var accounts = accountPromise.Result; 

     // Batch configuration 
     var itemCount = accounts.Count(); 
     var numBatches = (int)Math.Ceiling((double)itemCount/this.batchSize); 
     Debug.WriteLine("Found {0} items what will be put in {1} batches of {2}", itemCount, numBatches, this.batchSize); 


     for (int i = 0; i < numBatches; i++) 
     { 
      var itemsToTake = Math.Min(this.batchSize, itemCount - currentIndex); 
      Debug.WriteLine("Running batch - skip {0} and take {1}", currentIndex, itemsToTake); 

      // Take a subset of the items and place them onto the queue 
      var batch = accounts.Skip(currentIndex).Take(itemsToTake); 

      // Generate a list of tasks to enqueue the items 
      var taskList = new List<Task>(itemsToTake); 
      taskList.AddRange(batch.Select(account => target.SendAsync(account.AsProcessQueueItem(batchId)))); 

      // Return the control when all tasks have been enqueued 
      Task.WaitAll(taskList.ToArray()); 

      currentIndex = currentIndex + this.batchSize; 
     } 

然而這工作,我的同事說 - 「可以難道我們使界面更簡單,讓生成(),使界面像這樣:

public Task<IEnumerable<ProcessQueueItem> Generate(Guid someId) 

會更加清晰,並沒有生成方法依賴於第三方物流庫。我完全同意,我只是affraid,如果我這樣做,我不得不打電話

var result = Generate().Wait().Result; 

在某些時候,enqueuinig所有項目之前。這將使我等待,直到所有的東西都被加載並存儲在內存中。

所以我的問題是:我怎麼才能開始使用EF查詢結果,只要他們'從選擇滴'?如果你趕上我的漂移,就好像EF會對結果產生「收益」。

編輯 我想我犯了一個思維錯誤。 EF默認加載項目。所以我可以將所有結果返回爲IQueryable <>但這並不意味着它們實際上是從數據庫加載的。然後我會遍歷它們並將它們排入隊列。

EDIT 2 不,那是不行的,因爲我需要在生成()方法從數據庫中變換對象...

+0

結果分頁應該使用正確的T-SQL語句完成。異步與語句如何執行有關,而不是分頁或結果的形狀。一個普通的舊同步SqlReader對於逐行消耗(和排隊)結果要好得多。 ORM(不僅僅是EF)對批處理操作(如你所描述的那樣)來說是不好的選擇 –

回答

0

OK,這是我結束了:

public IEnumerable<ProcessQueueItem> Generate(Guid batchId) 
    { 
     var accounts = this.AccountStatusRepository.GetAccountsByBatchId(batchId.ToString()); 

     foreach (var accountStatuse in accounts) 
     { 
      yield return accountStatuse.AsProcessQueueItem(batchId); 
     } 
    } 

存儲庫僅返回一些DataContext.Stuff.Where(...)的IEnumerable。生成器使用擴展方法將實體轉換爲域模型(ProcessQueueItem),該模型通過yield的方式立即發送給方法的調用者,該方法將開始調用QueueManager開始排隊。