2012-01-01 61 views
3

我需要並行執行多個長時間運行的操作,並希望以某種方式報告進度。從我最初的研究看來,IObservable似乎適合這個模型。這個想法是,我調用了一個返回int的IObservable的方法,其中int報告完成百分比,並行執行在退出方法時立即開始,此可觀察值必須是熱可觀察值,以便所有訂戶在特定時間點學習相同的進度信息,例如遲到的用戶可能只會知道整個執行過程已經完成,並且沒有更多的跟蹤進度。.Net RX:跟蹤並行執行的進度

我發現的最接近這個問題的方法是使用Observable.ForkJoin和Observable.Start,但我無法理解如何使它們成爲我可以從方法返回的單個可觀察值。

請分享你的想法如何實現,或者有另一種方法來解決這個問題使用.Net RX。

回答

3

爲了讓熱的可觀的,我可能會與使用BehaviorSubject作爲返回值和操作報告進度的方式方法入手。如果你只是想要這個例子,跳到最後。這個答案的其餘部分解釋了這些步驟。

爲了這個答案,我會假設你的長時間運行沒有自己的方法來異步調用。如果他們這樣做,下一步可能會有所不同。接下來要做的工作是使用IScheduler將工作發送到另一個線程。如果需要,您可以允許調用者選擇將工作調度器作爲參數的過載(在這種情況下,過載不會選擇默認調度器),從而選擇工作發生的位置。 IScheduler.Scheduler有很多重載,其中有幾個是擴展方法,所以你應該通過它們來查看哪些最適合你的情況。我在這裏只用了Action。如果您有多個可以並行運行的操作,則可以多次撥打scheduler.Schedule

其中最難的部分可能是確定在任何給定點上的進展。如果您有多個操作同時進行,那麼您可能需要跟蹤已完成的操作數,以瞭解當前的進度。根據你提供的信息,我不能比這更具體。

最後,如果您的操作可以取消,您可能需要將CancellationToken作爲參數。在啓動之前,您可以使用它來取消調度程序隊列中的操作。如果您正確編寫操作代碼,它也可以使用令牌進行取消。

IObservable<int> DoStuff(/*args*/, 
         CancellationToken cancel, 
         IScheduler scheduler) 
{ 
    BehaviorSubject<int> progress; 
    //if you don't take it as a parameter, pick a scheduler 
    //IScheduler scheduler = Scheduler.ThreadPool; 

    var disp = scheduler.Schedule(() => 
    { 
     //do stuff that needs to run on another thread 

     //report progres 
     porgress.OnNext(25); 
    }); 
    var disp2 = scheduler.Schedule(...); 

    //if the operation is cancelled before the scheduler has started it, 
    //you need to dispose the return from the Schedule calls 
    var allOps = new CompositeDisposable(disp, disp2); 
    cancel.Register(allOps.Dispose); 

    return progress; 
} 
+0

這是一個很好的示例和解釋。謝謝。我只有一個問題。如果我的長時間運行的操作被分成幾組,並且我需要在並行執行組內操作的同時連續執行組?我應該在這種情況下使用任務庫,允許通過ContinueWith而不是調度器鏈接這種類型,但保留使用BehaviorSubject? – andriys 2012-01-04 13:30:04

+1

@andriys這是一個選項。另一種選擇是將每個組劃分爲一個返回IObservable的單獨函數。這些函數可以通過'Observable.Concat'鏈接在一起。但是,你會希望除了鏈條中的第一個可觀察對象之外的其他所有元素都變冷。 '觀察到。Defer'可以讓你將一個熱門的觀察者轉換成一個冷的觀察者。 – 2012-01-04 14:00:15

+0

太好了,謝謝你的指導! – andriys 2012-01-04 15:15:33

0

這是一種方法

// setup a method to do some work, 
// and report it's own partial progress 
Func<string, IObservable<int>> doPartialWork = 
    (arg) => Observable.Create<int>(obsvr => { 
     return Scheduler.TaskPool.Schedule(arg,(sched,state) => { 
      var progress = 0; 
      var cancel = new BooleanDisposable(); 
      while(progress < 10 && !cancel.IsDisposed) 
      { 
       // do work with arg 
       Thread.Sleep(550); 
       obsvr.OnNext(1); //report progress 
       progress++; 
      } 
      obsvr.OnCompleted(); 
      return cancel; 
     }); 
    }); 

var myArgs = new[]{"Arg1", "Arg2", "Arg3"}; 

// run all the partial bits of work 
// use SelectMany to get a flat stream of 
// partial progress notifications 
var xsOfPartialProgress = 
     myArgs.ToObservable(Scheduler.NewThread) 
       .SelectMany(arg => doPartialWork(arg)) 
        .Replay().RefCount(); 

// use Scan to get a running aggreggation of progress 
var xsProgress = xsOfPartialProgress 
        .Scan(0d, (prog,nextPartial) 
          => prog + (nextPartial/(myArgs.Length*10d))); 
+0

您的函數創建一個冷觀察。這項工作並不是從對「doPartialWork」的初始調用開始,而是從每個*訂閱返回的可觀察事件開始。 – 2012-01-01 17:40:01

+0

@Gideon好點 - 修改回答 – 2012-01-01 17:54:57

+0

更好,但仍然很冷。 – 2012-01-01 20:23:40