0

當目前正在努力實現我的Eclipse環境中我自己實現的ApacheSpark V2.0 DSTREAM返回類型重載實施星火DSTREAM

import org.apache.spark.streaming.dstream.DStream 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.rdd.RDD 
import org.apache.spark.streaming.Time 

class MQTTDStream[T](ssc: StreamingContext) extends DStream(ssc) { 
    override def compute(validTime: Time): RDD[Int] = {  ssc.sparkContext.parallelize(Array(1, 2, 3), 1) } 
} 

編譯器是確定這一點。但我將代碼粘貼到IBM DSExperience一個jupyter筆記本電腦,並得到了以下錯誤:

Name: Compile Error Message: :21: error: overriding method compute in class DStream of type (validTime: org.apache.spark.streaming.Time)Option[org.apache.spark.rdd.RDD[Nothing]]; method compute has incompatible type override def compute(validTime: Time): RDD[Int] = { ssc.sparkContext.parallelize(Array(1, 2, 3), 1) } ^:20: error: class MQTTDStream needs to be abstract, since: it has 2 unimplemented members. /** As seen from class MQTTDStream, the missing signatures are as follows. * For convenience, these are usable as stub implementations. */ def dependencies: List[org.apache.spark.streaming.dstream.DStream[_]] = ??? def slideDuration: org.apache.spark.streaming.Duration = ???

class MQTTDStream[T](ssc: StreamingContext) extends DStream(ssc) { 
     ^StackTrace: 

編輯:16年8月31日

現在我已經進步了一下:

abstract class MQTTDStream[T](ssc: StreamingContext) extends DStream(ssc) { 
    override def compute(validTime: Time): Option[RDD[T]] = 
    Some(ssc.sparkContext.parallelize(Seq(1, 2, 3), 1)) 

    override def dependencies = Nil 

    override def slideDuration = Seconds(1) // just an example 
} 

給我:

type mismatch; found : Int(1) required: T

+0

'選項[org.apache.spark.rdd.RDD [沒什麼];'聞起來是錯誤的。你確定所有的依賴在jupyter中正確加載?使用相同的Spark版本? –

回答

1
  1. 你缺少的類型paramete r爲DStream(這是錯誤消息中的Nothing來自); 2. compute應返回Option[RDD[Something]],而不僅僅是RDD[Something]; 3.您還需要定義dependenciesslideDuration

所以至少改變將是

class MQTTDStream[T](ssc: StreamingContext) extends DStream[Int](ssc) { 
    override def compute(validTime: Time): Option[RDD[Int]] = 
    Some(ssc.sparkContext.parallelize(Array(1, 2, 3), 1)) 

    override def dependencies = Nil 

    override def slideDuration = Seconds(1) // just an example 
} 
+0

現在我得到: 重寫方法在類DStream中計算(validTime:org.apache.spark.streaming.Time)Option [org.apache.spark.rdd.RDD [Nothing]];方法compute具有不兼容的類型\t MQTTDStream.scala –

+0

您是否忘記了'DStream [Int]'? –