0

我想處理ea。異步查詢提取(可能會對每個查詢進行多次提取)。爲了做到這一點,我將處理函數(返回Future)傳遞給我的查詢方法來調用它。取。我不知道我的查詢的結果大小;我只知道我的抓取的最大尺寸。因此,我的查詢返回Observable(而不是List,例如,我需要事先知道尺寸)。唯一的問題是,當我使用Observablecreateapply時,它會在內部阻塞,直到我的Future完成,然後它調用下一個onNext - 有效地消除了我希望從期貨中獲得的性能收益。工廠方法的Observablefrom不會阻塞,但需要Iterable。我可以通過一個可變的Iterable,並隨着新的進球進場而增長。有人擁有更引人注目的sol'n?下面的代碼:Scala Observable創建區塊我的期貨

object Repository { 
    def query(fetchSize: Int)(f: Set[Int] => Future[Set[Int]]): Observable[Future[Set[Int]]] = { 
    // observable (as opposed to list) because modeling a process 
    // where the total result size is unknown beforehand. 
    // Also, not creating or applying because it blocks the futures 
    val mut = scala.collection.mutable.Set[Future[Set[Int]]]() 
    val obs = Observable.from(mut) 
    1 to 2100 by fetchSize foreach { i => 
     mut += f(DataSource.fetch(i, fetchSize)) 
    } 
    obs 
    } 
} 
+0

使用Observable生產期貨對我來說沒有任何意義。我認爲這是完全錯誤 – Nyavro

+0

是'DataSource.fetch(i,fetchSize)'阻塞呼叫嗎? – zsxwing

+0

@zsxwing,是的,它阻止。 – juanchito

回答

0

我能夠通過使用foldLeft去除可變性:

(1 to 21 by fetchSize).foldLeft(Observable just Future((Set[Int]()))) { (obs, i) => 
    obs + f(DataSource.fetch(i)()) 
} 

其中:

implicit class FutureObservable(obs: Observable[Future[Set[Int]]]) { 
    def +(future: Future[Set[Int]]) = 
    obs merge (Observable just future) 
} 

的唯一的事情是,我不喜歡我什麼要做一個空的Observable,編譯器不抱怨。如果有人有更好的答案,請發佈它,我會標記它。

+0

你可以使用'reduceLeft',它會把它的初始累加器作爲第一個元素集合。如果沒有,你會得到例外。解決方案是'reduceLeftOption'。 –

+0

謝謝@Łukasz。我意識到我不需要使用'Observable'並用w /'Set'來代替。這是最後的[code](https://gist.github.com/mayonesa/a0f808c6c6f585a37155) – juanchito