2012-09-23 129 views
1

我在閱讀有關Rx並通過一些示例。我試圖執行此操作:Rx多個異步操作

List<A> LIstA {get;set;} 
List<B> LIstB {get;set;} 
List<C> LIstC {get;set;} 

private void GetItems() 
{ 
    ListA = GetItemsA(); 
    ListB = GetItemsB(); 
    ListC = GetItemsC(); 
} 

所有此代碼都在主線程上執行。我刪除了主線程(UI)使用此列表的代碼。我想要的是以異步方式獲取這些項目,但我需要以預定義順序進行的結果。

以異步方式運行它並不麻煩,但我在按預定義順序獲取結果時遇到問題。因此,我運行代碼,僅在此預定義序列中顯示UI,幾秒鐘後會顯示ListA填充,然後ListB,最後顯示ListC

如何實現這一目標?

回答

1

Taskasync可能比Rx​​更適合此模型。

不過,從我可以推斷出你的問題,使用CONCAT連接可觀察到的前一個完成後:

 Func<Action, IObservable<Unit>> fetch = 
      action => Observable.Defer(() => Observable.Start(action)); 

     fetch(() => A()) 
     .Concat(fetch(() => B())) 
     .Concat(fetch(() => C())) 
     .Subscribe(); 
+0

嗨阿斯蒂,我已經用Task實現了它。在這種情況下,我需要使用協程,並且這種複雜的一些其他的東西(在子類中重寫)。如果你知道使用任務的例子,而不使用協程,請讓我知道。至於你使用Rx的例子,單元究竟是什麼?它是A類,B類和C類的基類嗎?什麼是行動?從您發佈的代碼中不太清楚。 – Goran

+0

[單元](http://msdn.microsoft.com/en-us/library/system.reactive.unit(v = vs.103).aspx)在功能性編程中等同於'void'。它可以被認爲是執行不返回任何內容的方法的結果。第一行是避免重複的代表。你可以用'()=> ListA = GetItemsA()'替換'()=> A()'。 – Asti

+0

因此,現在我可以編譯,數據按預定義的順序異步檢索 - 工作正常。問題:我不能在UI(主線程)上使用LIstA和ListB,因爲接收到:調用線程不能訪問此對象,因爲不同的線程擁有它。「。我曾嘗試在.Subscribe()之前添加.ObserveOn(SynchronizationContext.Current) ,但它並沒有幫助。如何獲得結果列表回到主線程? – Goran

0

做這項工作的嗎?

GetItemsA().ToObservable() 
    .Zip(GetItemsB().ToObservable(), (a, b) => new { a, b, }) 
    .Zip(GetItemsC().ToObservable(), (ab, c) => new { ab.a, ab.b, c, }) 
    .Subscribe(abc => 
    { 
     ListA = abc.a; 
     ListB = abc.b; 
     ListC = abc.c; 
    }); 
+0

這確保在調用Subscribe之前執行所有這三件事,但這並不像OP請求 –

+0

@PaulBetts那樣按順序執行它們 - 我認爲這個任務必須是順序的。 – Enigmativity

0

根據你的問題,你有三個不同類型的集合。很顯然,人們不能共同分配三種不同類型的流。我覺得@ Enigmativity的是的allmost正確的,但爲了使它工作,你需要將其更改爲:

var uiScheduler = new SynchronizationContextScheduler(SynchronizationContext.Current); 
ListA = new ListA(); 
ListB = new ListB(); 
ListC = new ListC(); 

GetItemsA().ToObservable() 
.Zip(GetItemsB().ToObservable(), (a, b) => new { a, b, }) 
.Zip(GetItemsC().ToObservable(), (ab, c) => new { ab.a, ab.b, c, }) 
.ObserveOn(uiScheduler) 
.Subscribe(abc => 
{ 
    ListA.Add(abc.a); 
    ListB.Add(abc.b); 
    ListC.Add(abc.c); 
}); 

另一種解決方案可能是:

var a = Observable.Start(() => GetListA()); 
var b = Observable.Start(() => GetListB()); 
var c = Observable.Start(() => GetListC()); 

a.Zip(b, c, (x, y, z) => Tuple.Create(x, y, z)) 
    .ObserveOn(uiScheduler) 
    .Subscribe(result => 
       { 
        ListA = result.Item1; 
        ListB = result.Item2; 
        ListC = result.Item3; 
       }); 

我儘快與您創建它們想要的收藏:

a.ObserveOn(uiScheduler) 
    .Do(l => ListA = l) 
    .Zip(b, (la, lb) => lb) 
    .ObserveOn(uiScheduler) 
    .Do(l => ListB = l) 
    .Zip(c, (lb, lc) => lc) 
    .ObserveOn(uiScheduler) 
    .Subscribe(listc => ListC = listc); 
+0

這與這個問題是a,b和c同時評估。來自OP的問題:「我需要以預定義順序發佈結果。」 – Asti

0

你可以使用動態以及...:

void Main() 
{ 
    var q = 
    from @as in Observable.Start(() => GetItemsA()) 
    from bs in Observable.Start(() => GetItemsB()) 
    from cs in Observable.Start(() => GetItemsC()) 
    select @as.Cast<dynamic>().Concat(bs.Cast<dynamic>()).Concat(cs.Cast<dynamic>()); 
    q.Subscribe(t => t.Dump()); 
} 

// Define other methods and classes here 
private IEnumerable<int> GetItemsA() { 
    yield return 1; 
    Thread.Sleep(500); 
    yield return 2; 
    yield return 3; 
} 
private IEnumerable<string> GetItemsB() { 
    yield return "A"; 
    yield return "B"; 
    Thread.Sleep(1000); 
    yield return "C"; 
} 
private List<float> GetItemsC() { 
    return new List<float> { 1.1f, 2.2f, 3.3f }; 
}