2017-05-04 35 views
0

在過去的3-4個月裏,我發現,在默認情況下或者只是巧合,我所有的RxUI observables(.Subscribe)都是在各種對象的構造函數(代碼)中創建的。然而,該應用程序還大量使用後臺任務來監視各種流並採取行動(更新UI,創建事務等),並且有時這些任務需要中止某些事務。所以,我只是想知道哪裏是實例化中止指令(標誌)的「偵聽器」的最佳「地點」?在事務對象創建時,還是在監視流的「監督」對象內?如果在交易中,對績效和處置有什麼影響?我知道這是一個相當寬泛的問題,但我希望有人能夠推薦適當的模式。任何指針,評論贊賞。RxUI - 爲每個事務或流處理對象創建一個偵聽器會更好嗎?

請參閱下面的實體模型代碼:

using System; 
using System.Threading.Tasks; 
using System.Threading; 

using ReactiveUI; 

using UserSettingsClassLibrary; 

namespace fxR.UTILITIES.MockCode 
{ 
    public class MockRxUI_AbortTransaction : ReactiveObject { } // ignore this class 

    public class ManageTransactions : ReactiveObject 
    { 
     // properties 
     CancellationTokenSource ctTransactionPosition; 

     // constructor 
     public ManageTransactions() 
     { 
      // initialise 
      ctTransactionPosition = new CancellationTokenSource(); 
     } 

     // methods 
     public async Task OpenTransaction(MyStreamProcessingObject sPO, string user, string openTransactionAction) 
     { 
      // ... create transaction object, db log, etc 

      // open a 'listener' in case this Transaction is aborted 
      if (user.Contains("/")) 
      { 
       sPO.WhenAnyValue(x => x.ActiveObject.AbortTrade) 
        .Log(this, "TransactionOpen -> Abort trigger ") 
        .Subscribe(abort => 
        { 
         if (sPO.ActiveObject.TradeOpen && abort) 
         { 
          // abort 
         } 
        }); 
      } 
     } 
    } 

    // the OpenTransaction method above would be called from elsewhere in the app, eg 
    public class SomeOtherObject : ReactiveObject 
    { 
     ManageTransactions _manageTransactions; 
     MyStreamProcessingObject _sPO; 

     public SomeOtherObject(MyStreamProcessingObject sPO) 
     { 
      _sPO = sPO; 
      _manageTransactions = new ManageTransactions(); 
     } 

     public async Task DoSomethingAsync() 
     { 
      await _manageTransactions.OpenTransaction(_sPO, Environment.UserName + "/" + GetType().Name, "TransactionAction").ConfigureAwait(false); 
     } 
    } 
    // ================================================================================================ 
    // OR, open a 'listener' on the Stream Processing Object when it is constructed (first initialised) 
    // ================================================================================================ 

    public class MyStreamProcessingObject : ReactiveObject 
    { 
     // object properties 
     private ActiveObject _activeObject; 
     public ActiveObject ActiveObject 
     { 
      get { return _activeObject; } 
      set { this.RaiseAndSetIfChanged(ref _activeObject, value); } 
     } 

     // constructor 
     public MyStreamProcessingObject() 
     { 
      // listen for Transaction abort status 
      this.WhenAnyValue(x => x.ActiveObject.AbortTrade) 
       .Log(this, "Log -> ActiveObject.AbortTransaction") 
       .Subscribe(async abort => 
       { 
        if (abort) 
        { 
         await Task.Run(async() => await AbortOpenAutoTransaction().ConfigureAwait(false)); 
        } 
       }); 
     } 

     // methods 
     private async Task AbortOpenAutoTransaction() 
     { 
      var ok = false; 
      // await some abort code, if all ok, set ok = true; 
      if (ok) ActiveObject.AbortTrade = false; 
     } 
    } 
} 
+0

您的代碼沒有按更好沒有任何意義。你已經聲明'OpenTransaction'是異步的並返回類型'Task',但你沒有'await'調用或返回語句。訂閱電話顯然泄露了IDisposables。糟糕的代碼從一開始。 – bradgonesurfing

回答

0

我可以給你用的async/await的precence關於訂閱和一次性一些建議。您可以使用以下模式。在任務完成時

async Task<int> Foo(IObservable<bool> observable, Task<int> task){ 

    var subscription = observable.Subscribe 
     (x => Console.WriteLine("WhooHoo"); 

    using(subscription) 
    { 
     var value = await task; 
     return value + 10; 
    } 
} 

這相當於

async Task<int> Foo(IObservable<bool> observable, Task<int> task){ 

    var subscription = observable.Subscribe 
     (x => Console.WriteLine("WhooHoo"); 

    try 
    { 
     var value = await task; 
     return value + 10; 
    }finally{ 
     subscription.Dispose(); 
    } 
} 

的訂閱將被設置。

如果您發現您正在構造函數中創建訂閱,那麼您需要另一種模式。

public class Foo : ReactiveObject, IDisposable { 
    private CompositeDisposable _Cleanup = new CompositeDisposable(); 

    public Foo(IObservable<int> observable){ 
     var subscription = observable 
      .Subscribe(x=>Console.WriteLine("WhooHoo")); 
     _Cleanup.Add(subscription);  

    } 

    public void Dispose(){ 
     _Cleanup.Dispose(); 
    } 
} 

或使用RXUI擴展方法DisposeWith

public class Foo : ReactiveObject, IDisposable { 
    private CompositeDisposable _Cleanup = new CompositeDisposable(); 

    public Foo(IObservable<int> observable){ 
     observable 
      .Subscribe(x=>Console.WriteLine("WhooHoo")) 
      .DisposeWith(_Cleanup); 
    } 

    public void Dispose(){ 
     _Cleanup.Dispose(); 
    } 
} 

通常我創建一個基類

public class DisposableReactiveObject : ReactiveObject, IDisposable { 
    protected CompositeDisposable Cleanup { get; } = new CompositeDisposable; 
    public void Dispose(){ 
     _Cleanup.Dispose(); 
    } 
} 

,然後可以用它作爲

public class Foo : DisposableReactiveObject { 
    public Foo(IObservable<int> observable){ 

     observable 
      .Subscribe(x=>Console.WriteLine("WhooHoo")) 
      .DisposeWith(Cleanup); 

    } 
} 
+0

您推薦的圖案看起來不錯,我會用它來改善我的代碼。在我的例子中,一個問題是,StreamProcessingObject保持打開狀態,直到用戶關閉或退出應用程序,那麼這是否需要爲每個可觀察對象實現Dispose?據推測,垃圾收集將處理他們? – AzureGulf

+0

任何時候你打電話訂閱你會得到一個IDisposable,你必須決定如何處理。垃圾收集__不會將它們處置過。可以使用__using__語句或將其存儲在字段中以供日後處理。不允許作弊。 (同樣,如果你喜歡答案,請將其標記爲已接受) – bradgonesurfing

+0

就你的情況而言,'StreamProcessingObject'應實現'IDisposable'或'DisposableReactiveObject'並收集所有訂閱以我建議的模式處置。 – bradgonesurfing

相關問題