問題在手 寫了一個嘗試改進的雙向生成器工作線路,考慮到完全停止等。結果如想。它不使用mapPartition,但是按照下面的說明。SPARK N-grams&並行化不使用mapPartitions
import org.apache.spark.mllib.rdd.RDDFunctions._
val wordsRdd = sc.textFile("/FileStore/tables/natew5kh1478347610918/NGram_File.txt",10)
val wordsRDDTextSplit = wordsRdd.map(line => (line.trim.split(" "))).flatMap(x => x).map(x => (x.toLowerCase())).map(x => x.replaceAll(",{1,}","")).map(x => x.replaceAll("!
{1,}",".")).map(x => x.replaceAll("\\?{1,}",".")).map(x => x.replaceAll("\\.{1,}",".")).map(x => x.replaceAll("\\W+",".")).filter(_ != ".")filter(_ != "")
val x = wordsRDDTextSplit.collect() // need to do this due to lazy evaluation etc. I think, need collect()
val y = for (Array(a,b,_*) <- x.sliding(2).toArray)
yield (a, b)
val z = y.filter(x => !(x._1 contains ".")).map(x => (x._1.replaceAll("\\.{1,}",""), x._2.replaceAll("\\.{1,}","")))
我有一些問題:
結果如預期。沒有數據是錯過的。但是,我可以將這種方法轉換爲mapPartitions方法嗎?我會不會失去一些數據?許多人認爲這是由於我們將要處理的分區具有所有單詞的子集,因此在分割的邊界(即下一個和前一個單詞)中缺少關係。對於大文件分割,我可以從地圖角度看到這也可能發生。正確?然而,如果你看看上面的代碼(沒有mapPartitions嘗試),它總是能夠工作,不管我對它進行了多少並行化處理,10或100個分區中的字符在不同分區上是連續的。我用mapPartitionsWithIndex檢查了這個。這我不清楚。好的,對(x,y)=> x + y的減少是很好理解的。
在此先感謝。我必須在這一切中忽略一些基本點。
輸出&結果 Z:數組[(字符串,字符串)] =陣列((你好,如何),(如何,是),(是,你),(你今天),(I, ),(會,會),(會,會),(會,會),(會,會),(會,會),(會,會) ,(你,約),(約),(),(貓),(他,是),(是,不),(不),(做,如此),(如此,好),(什麼,應該),(應該,我們),(我們,做),(請,幫助),(幫助,我),(嗨,那裏),(那裏,ged))012g16amapped:org.apache.spark。 rdd.RDD [字符串] = MapPartitionsRDD [669]在mapPartitionsWithIndex於:123
分區分配 res13: Array [String] = Array(hello - > 0,how - > 0, - > 0,你 - > 0,今天。 - > 0,i - > 0,am - > 32,fine - > 32,但 - > 32, - > 32,like - > 32, - > 32,talk - > 60, - > 60,you - > 60,約 - > 60, - > 60,貓。 - > 60,他 - > 60,是 - > 60,不 - > 96,做 - > 96,所以 - > 96。 - > 96,什麼 - > 96,應該 - > 122,我們 - > 122,做。 - > 122,請 - > 122,幫助 - > 122,我。 - > 122,嗨 - > 155,那裏 - > 155,ged。 - > 155)
可能是SPARK真的很聰明,比我想象的要聰明。或者可能不是?在分區保存上看到了一些東西,有些與imho相矛盾。
map vs mapValues的意思是前破壞分區和單分區處理?
滑動將分區考慮在內sc ...串行或並行處理或混合不完全清晰。 – thebluephantom