2017-01-17 89 views
0

我有傳入的處理請求,其中我希望由於耗盡共享資源而不希望進行太多處理。我還希望共享一些獨特的關鍵要求不能同時執行:在RxJava/RxScala中組合groupBy和flatMap(maxConcurrent,...)

def process(request: Request): Observable[Answer] = ??? 

requestsStream 
    .groupBy(request => request.key) 
    .flatMap(maxConcurrentProcessing, { case (key, requestsForKey) => 
     requestsForKey 
     .flatMap(1, process) 
    }) 

然而,上述不起作用,因爲每個鍵可觀察到的永遠不會完成。什麼是實現這一目標的正確方法?

什麼不起作用:

.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) => 
     // Take(1) unsubscribes after the first, causing groupBy to create a new observable, causing the next request to execute concurrently 
     requestsForKey.take(1) 
     .flatMap(1, process) 
    }) 

.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) => 
     // The idea was to unsubscribe after 100 milliseconds to "free up" maxConcurrentProcessing 
     // This discards all requests after the first if processing takes more than 100 milliseconds 
     requestsForKey.timeout(100.millis, Observable.empty) 
     .flatMap(1, process) 
    }) 

回答

1

下面是如何成功地實現這一目標。對於每一個獨特的按鍵我指派專門的單一線程調度器(以便使用相同的密鑰信息,以便處理):

@Test 
public void groupBy() throws InterruptedException { 
    final int NUM_GROUPS = 10; 
    Observable.interval(1, TimeUnit.MILLISECONDS) 
      .map(v -> { 
       logger.info("received {}", v); 
       return v; 
      }) 
      .groupBy(v -> v % NUM_GROUPS) 
      .flatMap(grouped -> { 
       long key = grouped.getKey(); 
       logger.info("selecting scheduler for key {}", key); 
       return grouped 
         .observeOn(assignScheduler(key)) 
         .map(v -> { 
          String threadName = Thread.currentThread().getName(); 
          Assert.assertEquals("proc-" + key, threadName); 
          logger.info("processing {} on {}", v, threadName); 
          return v; 
         }) 
         .observeOn(Schedulers.single()); // re-schedule 
      }) 
      .subscribe(v -> logger.info("got {}", v)); 

    Thread.sleep(1000); 
} 

在我的情況下鍵(NUM_GROUPS)數量很小,所以我創建了專門的調度每個鍵:

Scheduler assignScheduler(long key) { 
    return Schedulers.from(Executors.newSingleThreadExecutor(
     r -> new Thread(r, "proc-" + key))); 
} 

如果鍵的數量是無限的或過大奉獻一個線程各一個,您可以創建調度池並重新使用它們像這樣:

Scheduler assignScheduler(long key) { 
    // assign randomly 
    return poolOfSchedulers[random.nextInt(SIZE_OF_POOL)]; 
} 
+0

ñ不會在我的情況下工作,因爲請求流是熱的和長期的並且包含許多不同的密鑰。此外,我寧願不創建額外的線程,但使用相同的線程池。 – dtech

+0

@dtech這個想法是,你不能將作業調度到特定的線程,但你可以安排他們到特定的調度程序,例如。單線程的。我在答案中添加了大量密鑰的解決方案。 –