2017-01-05 150 views
2

可以DStreamtype parameter s?類型 - 參數化DS流

如果是,如何?

當我嘗試myDStream: DStream[(A, B)](類參數)lazy val qwe = mStream.mapWithState(stateSpec),我得到:

value mapWithState is not a member of org.apache.spark.streaming.dstream.DStream[(A, B)] 
    lazy val qwe = mStream.mapWithState(stateSpec) 

回答

2

星火API的基本子集需要隱ClassTags(見Scala: What is a TypeTag and how do I use it?)和PairDStreamFunctions.mapWithState是沒有什麼不同。檢查class definition

class PairDStreamFunctions[K, V](self: DStream[(K, V)]) 
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K]) 

and

def mapWithState[StateType: ClassTag, MappedType: ClassTag](
    spec: StateSpec[K, V, StateType, MappedType] 
): MapWithStateDStream[K, V, StateType, MappedType] = { 
    ... 
} 

如果想創造出運行在一個通用的對流和使用mapWithState你至少應該爲KeyTypeValueType類型提供ClassTags功能:

def foo[T : ClassTag, U : ClassTag](
    stream: DStream[(T, U)], f: StateSpec[T, U, Int, Int]) = stream.mapWithState(f) 

如果StateTypeMappedType也是參數化的,您也需要ClassTags

def bar[T : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag](
    stream: DStream[(T, U)], f: StateSpec[T, U, V, W]) = stream.mapWithState(f)