2015-05-26 160 views
2

基本上我正在cassandra上運行兩個期貨查詢,然後我需要做一些計算並返回值(平均值)。卡住與斯卡拉期貨

這裏是我的代碼:

object TestWrapFuture { 
    def main(args: Array[String]) { 
    val category = 5392 
    ExtensiveComputation.average(category).onComplete { 
     case Success(s) => println(s) 
     case Failure(f) => throw new Exception(f) 
    } 
    } 
} 

class ExtensiveComputation { 

    val volume = new ListBuffer[Int]() 

    def average(categoryId: Int): Future[Double] = { 

    val productsByCategory = Product.findProductsByCategory(categoryId) 

    productsByCategory.map { prods => 
     for (prod <- prods if prod._2) { 
     Sku.findSkusByProductId(prod._1).map { skus => 
      skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get)) 
     } 
     } 

     val average = volume.sum/volume.length 
     average 
    } 
    } 
} 

object ExtensiveComputation extends ExtensiveComputation 

那麼是什麼問題?

skus.foreach將結果值附加到ListBuffer中。因爲一切都是異步的,所以當我嘗試在我的主體中獲得結果時,我得到一個錯誤,說我不能被零除。

事實上,由於我的Sku.findSkusByProduct返回一個Future,當我嘗試計算平均值時,該卷是空的。

我應該在計算之前阻止任何事情,還是應該做其他事情?

編輯

嗯,我試圖阻止這樣的:

val volume = new ListBuffer[Int]() 

    def average(categoryId: Int): Future[Double] = { 

    val productsByCategory = Product.findProductsByCategory(categoryId) 

    val blocked = productsByCategory.map { prods => 
     for (prod <- prods if prod._2) { 
     Sku.findSkusByProductId(prod._1).map { skus => 
      skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get)) 
     } 
     } 
    } 

    Await.result(blocked, Duration.Inf) 
    val average = volume.sum/volume.length 
    Future.successful(average) 
    } 

然後我從這段代碼兩種不同的結果:

Sku.findSkusByProductId(prod._1).map { skus => 
      skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get)) 
     } 

1 - 當有隻有幾個像cassandra擡頭看50,它只是運行,並給我結果

2 - 當有許多像1000,它給了我

java.lang.ArithmeticException:/零

EDIT 2

我想這個代碼@Olivier Michallat提議

def average(categoryId: Int): Future[Double] = { 

    val productsByCategory = Product.findProductsByCategory(categoryId) 

    productsByCategory.map { prods => 
     for (prod <- prods if prod._2) findBlocking(prod._1) 
     volume.sum/volume.length 
    } 
    } 

    def findBlocking(productId: Long) = { 
    val future = Sku.findSkusByProductId(productId).map { skus => 
     skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get)) 
    } 

    Await.result(future, Duration.Inf) 
    } 

而下面這個作爲@kolmar的提議:

def average(categoryId: Int): Future[Int] = { 
    for { 
     prods <- Product.findProductsByCategory(categoryId) 
     filtered = prods.filter(_._2) 
     skus <- Future.traverse(filtered)(p => Sku.findSkusByProductId(p._1)) 
    } yield { 
     val volumes = skus.flatten.map(sku => sku.height.get * sku.width.get * sku.length.get) 
     volumes.sum/volumes.size 
    } 
    } 

兩項工程的幾個單品找到像50,但都失敗了許多的SKU找到像1000投擲ArithmeticException:/零

看來,它不能返回未來之前計算的一切...

回答

3

您需要等到findSkusByProductId生成的所有期貨在計算平均值之前完成。因此,積累所有這些期貨在Seq,調用Future.sequence就可以得到Future[Seq],然後將該未來映射到計算平均值的函數。然後將productsByCategory.map替換爲flatMap

+0

請結帳我的更新... –

2

由於您必須調用返回Future參數序列的函數,因此最好使用Future.traverse

例如:

object ExtensiveComputation { 
    def average(categoryId: Int): Future[Double] = { 
    for { 
     products <- Product.findProductsByCategory(categoryId) 
     filtered = products.filter(_._2) 
     skus <- Future.traverse(filtered)(p => Sku.findSkusByProductId(p._1)) 
    } yield { 
     val volumes = skus.map { sku => 
     sku.height.get * sku.width.get * sku.length.get } 
     volumes.sum/volumes.size 
    } 
    } 
} 
+0

我剛上添加一個skus.flatten.map扁平化,因爲它是一個序列[序列[INT]]。當只有幾個skus可以找到時,這是有效的。當我把更多,如1000它給我java.lang.ArithmeticException:/由零 –