0
我有傳入的處理請求,其中我希望由於耗盡共享資源而不希望進行太多處理。我還希望共享一些獨特的關鍵要求不能同時執行:在RxJava/RxScala中組合groupBy和flatMap(maxConcurrent,...)
def process(request: Request): Observable[Answer] = ???
requestsStream
.groupBy(request => request.key)
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
requestsForKey
.flatMap(1, process)
})
然而,上述不起作用,因爲每個鍵可觀察到的永遠不會完成。什麼是實現這一目標的正確方法?
什麼不起作用:
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
// Take(1) unsubscribes after the first, causing groupBy to create a new observable, causing the next request to execute concurrently
requestsForKey.take(1)
.flatMap(1, process)
})
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
// The idea was to unsubscribe after 100 milliseconds to "free up" maxConcurrentProcessing
// This discards all requests after the first if processing takes more than 100 milliseconds
requestsForKey.timeout(100.millis, Observable.empty)
.flatMap(1, process)
})
ñ不會在我的情況下工作,因爲請求流是熱的和長期的並且包含許多不同的密鑰。此外,我寧願不創建額外的線程,但使用相同的線程池。 – dtech
@dtech這個想法是,你不能將作業調度到特定的線程,但你可以安排他們到特定的調度程序,例如。單線程的。我在答案中添加了大量密鑰的解決方案。 –