2016-04-14 80 views
1

功能,我想從Observable就像這樣的數組創建一個Observable調用(超載)RxJava從斯卡拉

package rxtest 

import concurrent._ 
import concurrent.ExecutionContext.Implicits.global 

import rx.lang.scala._ 
import rx.lang.scala.JavaConversions._ 
import rx.lang.scala.schedulers._ 

object A extends App { 
    val ps = Array.fill(3)(Promise[Int]()) 
    val os = ps map { 
      p => Observable from p.future observeOn NewThreadScheduler() 
     } 
    val v = rx.Observable.merge(os map toJavaObservable) 
} 

,該程序不能編譯,因爲Observable有幾個重載方法都稱merge

[error] /home/xgp/work/rxtest/src/main/scala/rxtest/A.scala:15: overloaded method value merge with alternatives: 
[error] [T](x$1: Array[rx.Observable[_ <: T]])rx.Observable[T] <and> 
[error] [T](x$1: rx.Observable[_ <: rx.Observable[_ <: T]])rx.Observable[T] <and> 
[error] [T](x$1: Iterable[_ <: rx.Observable[_ <: T]])rx.Observable[T] 
[error] cannot be applied to (Array[rx.Observable[_ <: Int]]) 
[error]  val v = rx.Observable.merge(os map toJavaObservable) 
[error]       ^
[error] one error found 

然後我想與另一個Java類的有助於消除超載:

public class RxUtils { 
    public final static <T> Observable<T> merge(Observable<? extends T>[] os) { 
     return Observable.merge(os); 
    } 
} 

的Scala代碼成爲(只有相關部分在此列出):

val ps = Array.fill(3)(Promise[Int]()) 
val os = ps map { 
     p => Observable from p.future observeOn NewThreadScheduler() 
    } 
val v = RxUtils.merge(os map toJavaObservable) 

這個程序仍然不能編譯:

[error] /home/xgp/work/rxtest/src/main/scala/rxtest/A.scala:17: no type parameters for method merge: (os: Array[rx.Observable[_ <: T]])rx.Observable[T] exist so that it can be applied to arguments (Array[rx.Observable[_ <: Int]]) 
[error] --- because --- 
[error] argument expression's type is not compatible with formal parameter type; 
[error] found : Array[rx.Observable[_ <: Int]] 
[error] required: Array[rx.Observable[_ <: ?T]] 
[error]  val v = RxUtils.merge(os map toJavaObservable) 
[error]     ^
[error] /home/xgp/work/rxtest/src/main/scala/rxtest/A.scala:17: type mismatch; 
[error] found : Array[rx.Observable[_ <: Int]] 
[error] required: Array[rx.Observable[_ <: T]] 
[error]  val v = RxUtils.merge(os map toJavaObservable) 
[error]       ^
[error] two errors found 

我有三個問題:

  1. 如何在第一種情況下使用純斯卡拉調用merge方法?
  2. 爲什麼第二個程序不能編譯?
  3. 如何調用斯卡拉上述RxUtilsmerge方法?
+0

如果您真的對Java和Scala generic的黑魔法感興趣,請檢查RxScala如何實現'flatten':https://github.com/ReactiveX/RxScala/blob/a385e1a474a05af5173d3a6c5f380b0f87b50dff/src/main/scala/rx/lang /scala/Observable.scala#L2669 – zsxwing

回答

2

我與你在這裏做什麼真的很困惑。爲什麼要混合rx.Observablerx.lang.scala.Observable。只要選擇其中一個:如果你在斯卡拉工作,選擇後者;如果您正在編寫Java代碼,請選擇前者!

我也想你指向this page,這兩種類型的Observable進行比較。

關於你的第一個程序,如果我是正確的ps類型爲Array[Promise[Int]],所以os必須有類型Array[Observable[Int]]。如果您想將它們合併爲一個Observable,您可以按照上面的鏈接在左欄中搜索merge(Array<Observable<? extends T>>)。事實證明,你可以在Scala中編寫這個文件,如Observable.from(os).flattenos.toObservable.flatten

關於第二個和第三個問題:我真的沒有選中此,但它可能具有與Java和Scala之間的協方差差異有關。可能通過提供一些額外的類型信息來幫助類型系統可以做到這一點。但是我想,如果你只是停留在Scala語言中,並且按照它的意思使用RxScala庫,那麼你就不必處理這樣的問題。