2016-04-29 53 views
0

嗨我正在嘗試處理文件中的數據。斯卡拉等待未來的列表執行

這是我在下面使用的代碼。

我有一份期貨清單,並試圖從這些期貨中獲得產出。

一切都很好,但返回的最後一行在OnSuccess之前執行。

如何在沒有阻止操作的情況下更改該行爲。

def processRow(rowNumber: Int, row: String, delimiter: String, rules: List[Rule]): RowMessage = { 
var cells = row.split(delimiter) 
var passedRules = new ListBuffer[RuleResult]() 
val failedRules = new ListBuffer[RuleResult]() 
val rulesFuture = rules.map { 
    i => Future { 
    val cells = row.split(delimiter); 
    //some processing.... 
    } 
} 
val f1 = Future.sequence(rulesFuture) 
f1 onComplete { 
    case Success(results) => for (result <- results) (result.map(x => { 
    if (x.isPassFailed) { 
     passedRules += x 
    } 
    else { 
     failedRules += x 
    } 
    })) 
    case Failure(t) => println("An error has occured: " + t.getMessage) 
} 
return new RowMessage(passedRules.toList, failedRules.toList) 
} 
+0

它與阿卡流有什麼關係? –

回答

1

你不能避免阻塞並返回一個普通的RowMessage。您還需要返回Future

def processRow(rowNumber: Int, row: String, delimiter: String, rules: List[Rule]): Future[RowMessage] = { 
    val cells = row.split(delimiter) 
    Future.traverse(rules) { i => 
    Future { 
     //some processing.... 
    } 
    } map { results => 
    val (passed, failed) = results.partition(_.isPassFailed) 
    new RowMessage(passed, failed) 
    } 
} 

也想想你的算法,以避免可變狀態,尤其是當你從不同的期貨更改它。

Future.traverse相當於您的map + Future.sequence。然後,而不是onComplete,只需映射Future修改列表。您可以使用partition輕鬆拆分它,而不是您一直在做的事情。

你不需要使用return,實際上你不應該除非你知道你在做什麼。

Btw isPassFailed對我來說聽起來不像一個合理的方法名稱,特別是考慮到如果它是真的,您將它添加到傳遞的規則。