2015-11-15 19 views
0

使用Scala的2.11.7,rxscala_2.11 0.25.0,rxjava 1.0.16,我oddFutures回調則不會調用在AsyncDisjointedChunkMultiprocessing.process()斯卡拉未來衍生觀察到的回調沒有得到所謂的

package jj.async 

import scala.concurrent.Future 
import scala.concurrent.ExecutionContext 
import rx.lang.scala.Observable 
import jj.async.helpers._ 

/* Problem: How to multi-process records asynchronously in chunks. 
    Processing steps: 
    - fetch finite # of records from a repository (10 at-a-time (<= 10 for last batch) because of downstream limitations) 
    - process ea. chunk through a filter asynchronously (has 10-record input limit) 
    - compute the reverse of the filtered result 
    - enrich (also has 10-record input limit) filtered results asynchronously 
    - return enriched filtered results once all records are processed 
*/ 
object AsyncDisjointedChunkMultiprocessing { 
    private implicit val ec = ExecutionContext.global 

    def process(): List[Enriched] = { 
    @volatile var oddsBuffer = Set[Int]() 
    @volatile var enrichedFutures = Observable just Set[Enriched]() 
    oddFutures.foreach(
     odds => 
     if (odds.size + oddsBuffer.size >= chunkSize) { 
      val chunkReach = chunkSize - oddsBuffer.size 
      val poors = oddsBuffer ++ odds take chunkReach 
      enrichedFutures = enrichedFutures + poors 
      oddsBuffer = odds drop chunkReach 
     } else { 
      oddsBuffer ++= odds 
     }, 
     error => throw error, 
    () => enrichedFutures + oddsBuffer) 
    enrichedFutures.toBlocking.toList.flatten 
    } 

    private def oddFutures: Observable[Set[Int]] = 
    Repository.query(chunkSize) { chunk => 
     evenFuture(chunk) map { 
     filtered => chunk -- filtered 
     } 
    } 

    private def evenFuture(chunk: Set[Int]): Future[Set[Int]] = { 
    checkSizeLimit(chunk) 
    Future { Remote even chunk } 
    } 
} 

class Enriched(i: Int) 

object Enriched { 
    def apply(i: Int) = new Enriched(i) 

    def enrich(poors: Set[Int]): Set[Enriched] = { 
    checkSizeLimit(poors); 
    Thread.sleep(1000) 
    poors map { Enriched(_) } 
    } 
} 

object Repository { 
    def query(fetchSize: Int)(f: Set[Int] => Future[Set[Int]]): Observable[Set[Int]] = { 
    implicit val ec = ExecutionContext.global 
    Observable.from { 
     Thread.sleep(20) 
     f(Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) 
     Thread.sleep(20) 
     f(Set(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)) 
     Thread.sleep(15) 
     f(Set(21, 22, 23, 24, 25)) 
    } 
    } 
} 

package object helpers { 
    val chunkSize = 10 

    implicit class EnrichedObservable(enrichedObs: Observable[Set[Enriched]]) { 
    def +(poors: Set[Int]): Observable[Set[Enriched]] = { 
     enrichedObs merge Observable.just { 
     Enriched.enrich(poors) 
     } 
    } 
    } 

    def checkSizeLimit(set: Set[_ <: Any]) = 
    if (set.size > chunkSize) throw new IllegalArgumentException(s"$chunkSize-element limit violated: ${set.size}") 
} 

// unmodifiable 
object Remote { 
    def even = { xs: Set[Int] => 
    Thread.sleep(1500) 
    xs filter { _ % 2 == 0 } 
    } 
} 

有我在Repository.query()中創建我的Observable.from(Future)的方式有問題嗎?

回答

0

問題是我試圖從多個期貨創建一個觀察值,但Observable.from(Future)只提供了一個單一的未來(編譯器沒有抱怨,因爲我不小心忽略了分隔逗號,從而篡奪了毫無防備的超載)。我sol'n:

object Repository { 
    def query(f: Set[Int] => Future[Set[Int]])(fetchSize: Int = 10): Observable[Future[Set[Int]]] = 
    // observable (as opposed to list) because modeling a process 
    // where the total result size is unknown beforehand. 
    // Also, not creating or applying because it blocks the futures 
    (1 to 21 by fetchSize).foldLeft(Observable just Future((Set[Int]()))) { (obs, i) => 
     obs + f(DataSource.fetch(i)()) 
    } 
} 

object DataSource { 
    def fetch(begin: Int)(fetchSize: Int = 10) = { 
    val end = begin + fetchSize 
    Thread.sleep(200) 
    (for { 
     i <- begin until end 
    } yield i).toSet 
    } 
} 

其中:

implicit class FutureObservable(obs: Observable[Future[Set[Int]]]) { 
    def +(future: Future[Set[Int]]) = 
    obs merge (Observable just future) 
}