2017-04-20 81 views
0

我不知道我們是否可以測試RDD在星火。如何測試星火RDD

我讀到一篇文章,它說懲戒一個RDD是不是一個好主意。是否有任何其他方式或任何最佳實踐測試RDD的

+1

你看過Holden的[spark-test-base](https://github.com/holdenk/spark-testing-base)了嗎? – Pushkr

回答

0

有測試星火RDD /應用2種方法。它們分別是:

例如

股,以測試

import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 

class WordCount { 
    def get(url: String, sc: SparkContext): RDD[(String, Int)] = { 
    val lines = sc.textFile(url) lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) 
    } 
} 

現在方法1來測試其計算方法如下:

import org.scalatest.{ BeforeAndAfterAll, FunSuite } 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 

class WordCountTest extends FunSuite with BeforeAndAfterAll { 
    private var sparkConf: SparkConf = _ 
    private var sc: SparkContext = _ 

    override def beforeAll() { 
    sparkConf = new SparkConf().setAppName("unit-testing").setMaster("local") 
    sc = new SparkContext(sparkConf) 
    } 

    private val wordCount = new WordCount 

    test("get word count rdd") { 
    val result = wordCount.get("file.txt", sc) 
    assert(result.take(10).length === 10) 
    } 

    override def afterAll() { 
    sc.stop() 
    } 
} 

在方法1中,我們不是在嘲笑RDD。我們只是檢查我們的WordCount類的行爲。但是我們必須自己管理SparkContext的創建和銷燬。所以,如果你不想寫額外的代碼,那麼你可以使用spark-testing-base,像這樣:

方法2

import org.scalatest.FunSuite 
import com.holdenkarau.spark.testing.SharedSparkContext 

class WordCountTest extends FunSuite with SharedSparkContext { 
    private val wordCount = new WordCount 

    test("get word count rdd") { 
    val result = wordCount.get("file.txt", sc) 
    assert(result.take(10).length === 10) 
    } 
} 

或者

import org.scalatest.FunSuite 
import com.holdenkarau.spark.testing.SharedSparkContext 
import com.holdenkarau.spark.testing.RDDComparisons 

class WordCountTest extends FunSuite with SharedSparkContext { 
    private val wordCount = new WordCount 

    test("get word count rdd with comparison") { 
    val expected = sc.textFile("file.txt") 
        .flatMap(_.split(" ")) 
        .map((_, 1)) 
        .reduceByKey(_ + _) 

    val result = wordCount.get("file.txt", sc) 

    assert(RDDComparisons.compare(expected, result).isEmpty) 
    } 
} 

對於在星火RDD測試更多詳情,請參閱本 - KnolX: Unit Testing of Spark Applications

+0

我只是一個小程序,所以我試圖使用方法:1,但是你(himanshu)在方法1中顯示的是沒有比較RDD。您正在對該RDD執行操作,然後嘗試將其與整數值相等。我想要比較2個RDD ... 可以說RDD [myClass] === RDD [myClass] – AJm

+0

爲了比較RDD(s),您應該使用方法2中提到的RDDComparisons。 – himanshuIIITian

+0

但是,使用由Some開發的定製庫,它仍在開發中,而不是像Apache這樣的大傘。它也可能不是生產準備好的。 – AJm

1

謝謝你把這個突出的問題放在那裏。出於某種原因,當談到Spark時,每個人都會深深陷入分析中,忘記過去15年左右出現的偉大軟件工程實踐。這就是爲什麼我們要在我們的課程中討論測試和持續集成(其中包括DevOps等)。

的快速除術語

我去之前,我必須表達對KnolX呈現@himanshuIIITian舉了一個小的分歧。一個真正單位測試意味着你有過在測試每個組件的完全控制。不能與數據庫,REST調用,文件系統甚至系統時鐘交互; Gerard Mezaros在xUnit Test Patterns中提到的所有內容都必須「加倍」(例如嘲笑,殘片等)。我知道這看起來像語義,但它真的很重要。未能理解這是你在持續集成中看到間歇性測試失敗的一個主要原因。

我們仍然可以單元測試

所以給出這樣的認識,單元測試的RDD是不可能的。但是,開發分析時仍然有單元測試的地方。

(注:我將使用斯卡拉的例子,但其概念超越了語言和框架。)

考慮一個簡單的操作:

rdd.map(foo).map(bar) 

這裏foobar是簡單的功能。這些可以通過正常的方式進行單元測試,並且應該儘可能多地使用角落案例。畢竟,他們爲什麼關心他們從哪裏獲得他們的投入,無論是測試夾具還是RDD

不要忘記星火殼牌

這不是測試本身,但在這些早期階段,你也應該在Spark外殼可以嘗試找出你的變換,尤其是你的方法的後果。例如,您可以檢查物理和邏輯查詢計劃,分區策略和保留以及具有許多不同功能的數據狀態,如toDebugString,explain,glom,show, printSchema等。我會讓你探索那些。

您還可以在Spark shell和您的測試中將您的主設置爲local[2],以識別一旦您開始分發工作時可能出現的任何問題。

集成測試與星火

現在到了有趣的東西。

爲了集成測試星火你覺得在你的助手職能的質量和RDD/DataFrame轉換邏輯信心後,關鍵是做了幾件事情(無論構建工具和測試框架):

  • 增加JVM內存。
  • 啓用分叉但禁用並行執行。
  • 使用您的測試框架將Spark集成測試累積到套件中,並在所有測試之前初始化SparkContext,並在所有測試之後停止測試。

有幾種方法可以做到這一點。其中一個可從spark-testing-base獲得@Pushkr和由@himanshuIIITian鏈接的KnolX演示文稿。

貸款模式

另一種方法是使用Loan Pattern

例如(使用ScalaTest):

class MySpec extends WordSpec with Matchers with SparkContextSetup { 
    "My analytics" should { 
    "calculate the right thing" in withSparkContext { (sparkContext) => 
     val data = Seq(...) 
     val rdd = sparkContext.parallelize(data) 
     val total = rdd.map(...).filter(...).map(...).reduce(_ + _) 

     total shouldBe 1000 
    } 
    } 
} 

trait SparkContextSetup { 
    def withSparkContext(testMethod: (SparkContext) => Any) { 
    val conf = new SparkConf() 
     .setMaster("local") 
     .setAppName("Spark test") 
    val sparkContext = new SparkContext(conf) 
    try { 
     testMethod(sparkContext) 
    } 
    finally sparkContext.stop() 
    } 
} 

正如你所看到的,在貸款模式利用了高階函數「貸款」的SparkContext到測試再到銷燬,它的後完成。

面向受苦編程(謝謝,內森)

這是完全喜好的問題,但我更願意把之前只要我可以使用的貸款模式和電線的東西了自己另一個框架。除了試圖保持輕量級之外,框架有時會增加很多「魔術」,使得調試測試失敗很難推理。所以我採取了Suffering-Oriented Programming的方法 - 我避免增加一個新的框架,直到沒有它的痛苦承受太多。但是,這又取決於你。

現在一個地方火花試驗基地真正的亮點是與基於Hadoop的助手一樣HDFSClusterLikeYARNClusterLike。混合這些特徵可以真正爲您節省很多設置痛苦。它發光的另一個地方是類似Scalacheck的屬性和發電機。但是,再次,我個人會堅持使用它,直到我的分析和測試達到了這種複雜程度。

集成測試與星火流

最後,我只想提出一個什麼樣的SparkStreaming集成測試設置的內存值看起來像一個片段:

val sparkContext: SparkContext = ... 
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3")) 
val rdd: RDD[(String, String)] = sparkContext.parallelize(data) 
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]] 
val streamingContext = new StreamingContext(sparkContext, Seconds(1)) 
val dStream: InputDStream = streamingContext.queueStream(strings) 
strings += rdd 

這比看起來更簡單。它實際上只是將一系列數據轉換爲隊列以供給DStream。其中大部分實際上只是與Spark API配合使用的樣板設置。

這可能是我有史以來最長的帖子,所以我會把它留在這裏。我希望別人與其他想法一起合作,通過相同的敏捷軟件工程實踐幫助提高分析質量,這些實踐改進了所有其他應用程序開發。

併爲無恥插頭道歉,你可以看看我們的課程Analytics with Apache Spark,我們解決了很多這些想法和更多。我們希望很快就有在線版本。