2017-03-31 19 views
7

我使用DirectKafkaStream API 1從卡夫卡讀取數據,進行一些轉換,更新計數,然後將數據寫回卡夫卡。其實這個代碼和平是在測試:Spark DStream使用轉換週期性地調用saveAsObjectFile不能按預期工作

kafkaStream[Key, Value]("test") 
     .map(record => (record.key(), 1)) 
     .updateStateByKey[Int](
     (numbers: Seq[Int], state: Option[Int]) => 
      state match { 
      case Some(s) => Some(s + numbers.length) 
      case _ => Some(numbers.length) 
      } 
    ) 
     .checkpoint(this)("count") { 
     case (save: (Key, Int), current: (Key, Int)) => 
      (save._1, save._2 + current._2) 
     } 
     .map(_._2) 
     .reduce(_ + _) 
     .map(count => (new Key, new Result[Long](count.toLong))) 
     .toKafka(Key.Serializer.getClass.getName, Result.longKafkaSerializer.getClass.getName) 

checkpoint運算符是一個濃縮的DStream API我已經創建,應該使用saveAsObjectFile幾乎保存給出一個TimeDStream的一個RDD到HDFS。實際上,它將每第60個微量批次(RDD)的結果保存到HDFS中。

檢查站執行以下操作:

def checkpoint(processor: Streaming)(name: String)(
mergeStates: (T, T) => T): DStream[T] = { 
val path = processor.configuration.get[String](
    "processing.spark.streaming.checkpoint-directory-prefix") + "/" + 
    Reflection.canonical(processor.getClass) + "/" + name + "/" 
logInfo(s"Checkpoint base path is [$path].") 

processor.registerOperator(name) 

if (processor.fromCheckpoint && processor.restorationPoint.isDefined) { 
    val restorePath = path + processor.restorationPoint.get.ID.stringify 
    logInfo(s"Restoring from path [$restorePath].") 
    checkpointData = context.objectFile[T](restorePath).cache() 

    stream 
    .transform((rdd: RDD[T], time: Time) => { 
     val merged = rdd 
     .union(checkpointData) 
     .map[(Boolean, T)](record => (true, record)) 
     .reduceByKey(mergeStates) 
     .map[T](_._2) 

     processor.maybeCheckpoint(name, merged, time) 

     merged 
    } 
) 
} else { 
    stream 
    .transform((rdd: RDD[T], time: Time) => { 
     processor.maybeCheckpoint(name, rdd, time) 

     rdd 
    }) 
} 
} 

的有效的代碼如下:

dstream.transform((rdd: RDD[T], time: Time) => { 
     processor.maybeCheckpoint(name, rdd, time) 

     rdd 
    }) 

如果上述代碼dstream變量是先前的操作者,這是updateStateByKey的結果,因此在updateStateByKey之後立即調用變換。

def maybeCheckpoint(name: String, rdd: RDD[_], time: Time) = { 
    if (doCheckpoint(time)) { 
    logInfo(s"Checkpointing for operator [$name] with RDD ID of [${rdd.id}].") 
    val newPath = configuration.get[String](
    "processing.spark.streaming.checkpoint-directory-prefix") + "/" + 
    Reflection.canonical(this.getClass) + "/" + name + "/" + checkpointBarcode 
    logInfo(s"Saving new checkpoint to [$newPath].") 
    rdd.saveAsObjectFile(newPath) 
    registerCheckpoint(name, Operator(name), time) 
    logInfo(s"Checkpoint completed for operator [$name].") 
    } 
} 

正如你看到的大部分代碼只是記賬,但saveAsObjectFile有效地叫。

問題是即使從updateStateByKey產生的RDD應該自動保留,當每個第X個微批次調用saveAsObjectFile時,Spark將重新計算從頭開始,從流式作業開始時開始的所有事情通過再次閱讀卡夫卡的一切。我試圖在DStream和RDDs上放置並強制使用不同級別的存儲器,cachepersist

微批次:

​​

DAG作業22:

DAG for job 22

DAG作業運行saveAsObjectFile

SAOF1 SAOF2

可能是什麼問題?

謝謝!

1使用Spark 2.1.0。

+0

這不是'saveAsObjectFile'在這裏是壞男孩,一個簡單的'count'做同樣的事情, – Dyin

+0

通過調用'saveAsObjectFile'告訴spark執行一個動作,它執行初始流上定義的所有轉換(記住:轉化是懶惰的)。之後執行其他操作時(例如'reduce'),將再次執行相同的轉換。爲了防止你可以在第一個動作之前調用'dstream.cache()'(即'checkpoint(this)(...)') –

+0

@AdiGerber我已經試過了,當然了。就在檢查點豐富之前,緩存甚至試圖堅持使用磁盤存儲,以確保只有內存的存儲級別不會強迫RDD被驅逐。無論如何,我只會「重複使用」最後一次緩存的RDD,它的大小隻有4KB - 用少量的數據進行測試 - 基數爲10的300.000個鍵值記錄.10個鍵實際上由'updateStateByKey'存儲。 – Dyin

回答

2

我相信使用transform定期檢查點會導致意外的緩存行爲。

改爲使用foreachRDD來執行定期檢查點將允許DAG保持足夠穩定以有效地緩存RDD。

我幾乎是積極的,這是我們前一段時間類似問題的解決方案。

+0

點檢查會削減線性圖(當您在DStream上調用它時),但仍然無法幫助我。問題不在於圖形圖很寬的事實。 'updateStateByKey'創建一個'StateDStream',當你仔細研究它時,你會發現它默認保存在內存中,實際上每個RDD都被保存到內存中。實際上,我在那裏做的是我在這樣一個RDD上調用'count',它應該由緩衝區RDD ID從BlockManager中提取出來,而不是重新計算它。我會嘗試你的解決方案,以防萬一,但我認爲它不會幫助我。 – Dyin

+0

請做,並且讓我們知道,消除低垂果實是一個好主意! :) – ImDarrenG

+0

話雖如此,我認爲你是對的。令人討厭的是我確定我以前見過一個非常類似的問題,但不記得我們如何解決它。 – ImDarrenG