2015-11-05 97 views
2

我正在學習RxScala並來到這個非常合成的片段。我試圖處理例外的onError塊:可觀察的異常處理

def doLongOperation():String = { 
    (1 to 10).foreach { 
    _ => 
     Thread.sleep(100) 
     print(".") 
    } 
    println() 
    if (System.currentTimeMillis() % 2 == 0) { 
    throw new RuntimeException("Something went wrong during long operation") 
    } 
    s"OK" 
} 

def main(args: Array[String]) { 
    println("Changing status: -> doing task1") 
    Observable.just(
    doLongOperation() 
).subscribe(
    str => println("Changing status: doing task1 -> doing task2"), 
    throwable => println(s"Failed: ${throwable.getMessage}"),  //never get here 
    () => println("Completed part") 
) 
} 

在例外的情況下,我希望是這樣的:

Failed: Something went wrong during long operation 

但我得到的是:

.........Exception in thread "main" java.lang.RuntimeException: Something went wrong during long operation 
at stats.STest$.doLongOperation(STest.scala:20) 
at stats.STest$.main(STest.scala:49) 
at stats.STest.main(STest.scala) 

我是什麼失蹤?我應該在觀察者處「手動」調用onError嗎?感謝您的幫助。

回答

1

問題是just()的曲解。它在組裝序列時需要一個現有值,而不是在用戶訂閱時執行的方法。換句話說,你的代碼是這樣的:

var tempValue = doLongOperation(); 

Observable.just(tempValue).subscribe(...) 

並且拋出一個Observable被創建之前的方式。

(對不起,我不知道斯卡拉或RxScala足以使原諒我的Java 8的例子。)

我不知道RxScala多遠的背後是RxJava,但RxJava 1.0.15有一個新的工廠方法,fromCallable,讓你推遲一個值:

Observable.fromCallable(() -> doLongOperation()).subscribe(...) 

另一種方法是原始源包裝成一個defer所以當doLongOperation拋出,它被路由到用戶:

Observable.defer(() -> Observable.just(doLongOperation())).subscribe(...) 
+0

非常感謝!現在事情變得更加清晰了 – Nyavro

1

Observable.just不能很好地處理異常情況,不確定這是錯誤還是預期的行爲。您可以嘗試這種方式,但:

Observable.create[String](o => { 
    o.onNext(doLongOperation()) 
    o.onCompleted() 
    Subscription{} 
}).subscribe(
    str => println("Changing status: doing task1 -> doing task2"), 
    throwable => println(s"Failed: ${throwable.getMessage}"), here 
() => println("Completed part") 
) 
+0

非常感謝!該解決方案完美無缺! – Nyavro