我想處理ea。異步查詢提取(可能會對每個查詢進行多次提取)。爲了做到這一點,我將處理函數(返回Future
)傳遞給我的查詢方法來調用它。取。我不知道我的查詢的結果大小;我只知道我的抓取的最大尺寸。因此,我的查詢返回Observable
(而不是List
,例如,我需要事先知道尺寸)。唯一的問題是,當我使用Observable
create
或apply
時,它會在內部阻塞,直到我的Future
完成,然後它調用下一個onNext - 有效地消除了我希望從期貨中獲得的性能收益。工廠方法的Observable
from
不會阻塞,但需要Iterable
。我可以通過一個可變的Iterable
,並隨着新的進球進場而增長。有人擁有更引人注目的sol'n?下面的代碼:Scala Observable創建區塊我的期貨
object Repository {
def query(fetchSize: Int)(f: Set[Int] => Future[Set[Int]]): Observable[Future[Set[Int]]] = {
// observable (as opposed to list) because modeling a process
// where the total result size is unknown beforehand.
// Also, not creating or applying because it blocks the futures
val mut = scala.collection.mutable.Set[Future[Set[Int]]]()
val obs = Observable.from(mut)
1 to 2100 by fetchSize foreach { i =>
mut += f(DataSource.fetch(i, fetchSize))
}
obs
}
}
使用Observable生產期貨對我來說沒有任何意義。我認爲這是完全錯誤 – Nyavro
是'DataSource.fetch(i,fetchSize)'阻塞呼叫嗎? – zsxwing
@zsxwing,是的,它阻止。 – juanchito