2013-03-08 67 views
3

我試圖拼湊出的Rx流水線的工作方式如下:遞歸/扇出在無擴展

  1. 我寫了一個函數,它在提供的IObservable我包含有關信息檔案公司
  2. 我查詢各種數據源以查找潛在的相關公司配置文件,並行查詢。我將它合併成一個公司簡介的單個IObservable。
  3. 當我拿回這些潛在相關的配置文件,我把它們比作是我已經觀察到型材和,如果他們有針對性> 80%,是不一樣的,因爲我已經觀察到的任何個人資料,我認爲他們比賽。
  4. 我想匹配的公司反饋到第1步,這樣我可以搜索相關的數據,這些新的配套文件。

我引導了一些已知的良好配置文件的過程。

最後,還有那些尚未看到任何更多的匹配型材等過程結束。

我無法編程此。如果我使用Subject來允許管道尾端將其配置文件發送到工作流程的開始,那麼沒有人會調用OnCompleted,並且我從未發現該流程已經結束。如果我用遞歸來開發這個,我似乎總是以堆棧溢出結束,因爲我試圖用自己的返回值來調用一個函數。

誰能幫我我怎麼能在某種程度上,我可以確定的過程已經結束完成這項任務?

+0

如果我是用硬件的術語,你在尋找一個多路複用器,解複用器? – JerKimball 2013-03-08 19:41:25

+0

@JerKimball我不確定你在這方面的含義。 – 2013-03-08 22:07:06

+0

嗯......讓我嘗試重寫:你有多個源,你想把它們打成一個單一的流,然後從這個單一流中「解除」出現回到相應的多個流? – JerKimball 2013-03-08 22:28:26

回答

5

這聽起來像你想這樣的數據流:

seed profiles --> source --> get related --> output 
        ^     | 
        |     v 
        -<--- transform <----- 

這看起來是解決地方的一般問題是不是特定的一個容易或更容易的情況下,所以我會提出一個通用的「反饋「應該給你的積木功能,你需要:

編輯:固定函數來完成

IObservable<TResult> Feedback<T, TResult>(this IObservable<T> seed, 
              Func<T, IObservable<TResult>> produce, 
              Func<TResult, IObservable<T>> feed) 
    { 
     return Observable.Create<TResult>(
       obs => 
       { 
        var ret = new CompositeDisposable(); 
        Action<IDisposable> partComplete = 
         d => 
         { 
          ret.Remove(d); 
          if (ret.Count == 0) obs.OnCompleted(); 
         }; 
        Action<IObservable<T>, Action<T>> ssub = 
         (o, n) => 
         { 
          var disp = new SingleAssignmentDisposable(); 
          ret.Add(disp); 
          disp.Disposable = o.Subscribe(n, obs.OnError,() => partComplete(disp)); 
         }; 
        Action<IObservable<TResult>, Action<TResult>> rsub = 
         (o, n) => 
         { 
          var disp = new SingleAssignmentDisposable(); 
          ret.Add(disp); 
          disp.Disposable = o.Subscribe(n, obs.OnError,() => partComplete(disp)); 
         }; 

        Action<T> recurse = null; 
        recurse = s => 
           { 
            rsub(produce(s), 
             r => 
             { 
              obs.OnNext(r); 
              ssub(feed(r), recurse); 
             }); 
           }; 

        ssub(seed, recurse); 
        return ret; 
       }); 
    } 

在你的情況下,TTResult看起來是一樣的,所以feed將是身份函數。 produce將用於實施步驟2和3

我測試了這個功能的一些示例代碼的功能:

void Main() 
{ 
    var seed = new int[] { 1, 2, 3, 4, 5, 6 }; 
    var found = new HashSet<int>(); 
    var mults = seed.ToObservable() 
        .Feedback(i => 
           { 
            return Observable.Range(0, 5) 
             .Select(r => r * i) 
             .TakeWhile(v => v < 100) 
             .Where(v => found.Add(v)); 
           }, 
           i => Observable.Return(i)); 

    using (var disp = mults.Dump()) 
    { 
     Console.WriteLine("Press any key to stop"); 
     Console.ReadKey(); 
    } 
    Console.WriteLine("Press any key to exit"); 
    Console.ReadKey(); 
} 

static IDisposable Dump<T>(this IObservable<T> source) 
{ 
    return source.Subscribe(item => Console.WriteLine(item), 
          ex => Console.WriteLine("Error occurred in dump observable: " + ex.ToString()), 
          () => Console.WriteLine("Dump completed")); 
} 
+0

這是非常接近的,但是,我也需要知道什麼時候該過程「停止」。由於每次我遞歸過濾併發回比我收到的結果更少的結果,最終該過程將0個項目發送回開始步驟,並且過程完成。我需要能夠在不使用任意超時的情況下檢測到這一點。 – 2013-03-09 12:07:46

+0

@DavidPfeffer我甚至沒有想到完成。與原始版本的問題當然是向前和向後觀察量都在等待對方纔完全完成。我已經編輯過包含另一個跟蹤優秀觀察對象的版本,並在沒有剩餘觀察對象時完成。在徹底的測試之前,有可能存在競爭條件,但我的基本測試仍然有效。 – 2013-03-09 17:56:33

+0

更接近!我最後剩下的問題是,因爲沒有建立管道,而是一個函數被遞歸調用,我不能使用狀態的運營商。例如,「變換」步驟的圖中不能有意義地包括一個'Distinct',因爲它只能在單個批次的結果,而不是整個過程的執行操作。 – 2013-03-10 11:48:17