3

我有一個序列,我需要根據每個元素的條件重複它。例如,如果標有「失敗」標誌,我需要重新處理一個元素。我的問題是我找不到如何做while-loop操作。如何根據每個元素的條件循環可觀察序列?

TakeWhile幾乎是我所需要的,但它不會重演。

/* 
* The following lines are just an example to comprehend the idea 
*/ 
var observableSequence = sequence.ToObservable(); 
observableSequence 
    //This ´DoWhile´ did not worked because does not accept each element as argument 
    //and sequence at this point is not the same as `observableSequence` 
    .DoWhile(() => sequence.Any(item => !item.Failed)) 
    .Where(item => item.Failed == true) //OK here i could put another condition for limited retries... 
    .Subscribe(item => { 
     try{ 
      //Do stuff... 
      //. . . 
      item.Failed = false; 
     } catch 
     { 
      item.Failed = true; 
     } 
    }); 

回答

2

我建議將原始序列合併爲一個新的可觀察物體,因爲它們會在物體失敗時將對象提供給它們。

var retries = new ReplaySubject<Foo>(); 
var loopSequence = sequence.ToObservable().Merge(retries); 

loopSequence 
    .Where(item => item.Failed) 
    .Subscribe(item => 
    { 
     try{ 
      //Do stuff 
      item.Failed = false; 
     } catch 
     { 
      item.Failed = true; 
     } 
     retries.OnNext(item); 
    }); 

它通常被認爲是不好的做法,改變觀測對象的狀態,所以你可能要考慮創建轉換,而不是:

loopSequence 
    .Where(item => item.Failed) 
    .Subscribe(item => 
    { 
     try{ 
      //Do stuff 
      retries.OnNext(new Item { ..., Failed = false }); 
     } catch 
     { 
      retries.OnNext(new Item { ..., Failed = true }); 
     } 
    }); 

你也應該小心使用此模式,因爲不斷不合格項目會讓你的程序執行成爲一種無限循環。

+1

這是一個有趣的方法。你可以將這個項目包裝在一個'Tuple '中,每次你通過'retries'來增加'int'來跟蹤嘗試的次數。這會讓你有能力在x次嘗試後以不同的方式處理它。 –

+0

感謝您的回覆。當我第一次寫代碼時,我的第一種方法是從序列中刪除元素,很快我意識到它沒有奏效......我唯一的解決方案是將原始對象封裝在另一箇中,並添加一個「失敗」標誌。你的第二個代碼我無法理解。轉換是如何工作的(我的意思是他們在哪裏返回)? – CodeArtist

+0

@CodeArtist:對不起,該代碼是錯誤的,我現在修復它。這個想法是,不是改變每個項目的狀態,而是創建一個具有不同狀態的新項目。還要注意,當事情成功時,我添加一個新項目的唯一原因是,如果您需要訂閱'Failed == false'的結果。如果那不是你需要做的事情,你可以避免把它們加入到重試主題中。 – StriplingWarrior

1

你可以用每個值和獨立重試操作:

observableSequence 
    .SelectMany(item => 
    Observable.Return(item) 
     .Select(x => //Do Stuff) 
     //Optional argument, omitting retries infinitely until 
     //success 
     .Retry(3) 
) 

    .Subscribe(item => { 
    //Handle the results 
    }); 

可選Return需要IScheduler因爲這可能改變重試得到處理(遞歸與蹦牀)順序的第二個參數。