2014-11-04 150 views
0
// 4 workers 
    val sc = new SparkContext("local[4]", "naivebayes") 

    // Load documents (one per line). 
    val documents: RDD[Seq[String]] = sc.textFile("/tmp/test.txt").map(_.split(" ").toSeq) 

    documents.zipWithIndex.foreach{ 
    case (e, i) => 
    val collectedResult = Tokenizer.tokenize(e.mkString) 
    } 

    val hashingTF = new HashingTF() 
    //pass collectedResult instead of document 
    val tf: RDD[Vector] = hashingTF.transform(documents) 

    tf.cache() 
    val idf = new IDF().fit(tf) 
    val tfidf: RDD[Vector] = idf.transform(tf) 
在上面的代碼片斷

,我想提取collectedResult重用它hashingTF.transform,如何才能實現這一目標,其中記號化功能的簽名是轉換斯卡拉字符串RDD [SEQ [字符串]

def tokenize(content: String): Seq[String] = { 
... 
} 

回答

1

看起來像你想map而不是foreach。我不明白你在爲什麼使用zipWithIndex,也不知道你爲什麼要在你的線路上打電話split,直接與mkString再次聯繫。

val lines: Rdd[String] = sc.textFile("/tmp/test.txt") 
val tokenizedLines = lines.map(tokenize) 
val hashes = tokenizedLines.map(hashingTF) 
hashes.cache() 
... 
+0

@Imm我該如何聲明其他rdd?對不起,我是一個新手! – Siva 2014-11-04 09:45:42

+0

你說你想把該函數的返回值追加到其他一些'RDD [Seq [String]]',否? 'otherRdd'是你想追加的。 – lmm 2014-11-04 09:54:22

+0

inp.zipWithIndex.foreach {e,i)=> val result:RDD [Seq [String]] ++ = sc.parallelize(Seq(Tokenizer.tokenize(e))) } 我想這個是錯的,我想把結果聲明爲循環,追加它並得到一個rdd。我不知道我該如何做到這一點。 – Siva 2014-11-04 10:07:00

相關問題