0
在第二次迭代的任務出現在線路掛起:代碼不Scala的星火在第二次迭代結束
val wordCountWithLabelsCollect = wordCountWithLabels.collect
這是斯卡拉輸出:
5/06/19 15:49:33 INFO DAGScheduler: Submitting Stage 1 (MappedValuesRDD[3] at mapValues at Ques.scala:33), which has no missing parents
15/06/19 15:49:33 INFO MemoryStore: ensureFreeSpace(2480) called with curMem=2219, maxMem=1030823608
15/06/19 15:49:33 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.4 KB, free 983.1 MB)
15/06/19 15:49:33 INFO MemoryStore: ensureFreeSpace(1812) called with curMem=4699, maxMem=1030823608
15/06/19 15:49:33 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1812.0 B, free 983.1 MB)
15/06/19 15:49:33 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:54590 (size: 1812.0 B, free: 983.1 MB)
15/06/19 15:49:33 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/06/19 15:49:33 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838
15/06/19 15:49:33 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MappedValuesRDD[3] at mapValues at Ques.scala:33)
15/06/19 15:49:33 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
Scala代碼:
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level
object Ques extends App {
val data = getUncategorisedData
data.foreach(document => {
runner
})
case class Document(label: String, text: String)
def reduceList(list: List[(String, Int)]) = list.groupBy(_._1).mapValues(_.aggregate(0)(_ + _._2, _ + _))
def runner = {
val trainingData = getSC.parallelize(
List(
Document("sport", "this is text for document spor a"),
Document("sport", "this spor is text for document spor b"),
Document("news", "this is such a new categorise data")))
val counts: org.apache.spark.rdd.RDD[(String, List[(String, Int)])] = trainingData.map(doc => ((doc.label, doc.text.split(" ").toList.map(w => (w, 1)))))
val mergedList = counts.mapValues((list: List[(String, Int)]) => reduceList(list).toList)
val wordCountWithLabels: org.apache.spark.rdd.RDD[(String, List[(String, Int)])] = mergedList.reduceByKey((accum: List[(String, Int)], value: List[(String, Int)]) =>
{
val valueMap = value.toMap
val accumMap = accum.toMap
val mergedMap = accumMap ++ valueMap.map { case (k, v) => k -> (v + accumMap.getOrElse(k, 0)) }
mergedMap.toList
})
val wordCountWithLabelsCollect = wordCountWithLabels.collect
wordCountWithLabels.collect
}
def getUncategorisedData: RDD[Document] = {
lazy val trainingData = getSC.parallelize(
List(
Document("", "this is text for document a"),
Document("", "this is text for document b"),
Document("", "this is text for for document c")))
trainingData
}
lazy val getSC = {
val conf = new org.apache.spark.SparkConf()
.setMaster("local")
.setAppName("process")
.setSparkHome("C:\\spark-1.1.0-bin-hadoop2.4\\spddark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4")
.set("spark.executor.memory", "3g")
.set("deploy-mode", "standalone")
.set("SPARK_CONF_DIR", "c:\\data\\sparkConfig")
val sc = new SparkContext(conf)
sc
}
}
這裏有什麼問題?
在同一集合上多次調用收集應該不是問題?
如果我調用亞軍陸續:
runner
runner
然後終止。
更新:的相同的行爲
簡單的例子:
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.rdd.PairRDDFunctions
object Ques extends App {
val conf = new org.apache.spark.SparkConf()
.setMaster("local")
.setAppName("process")
.setSparkHome("C:\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4")
.set("spark.executor.memory", "3g")
val sc = new SparkContext(conf)
val data = sc.parallelize(List(""))
val counts: org.apache.spark.rdd.RDD[(String)] = sc.parallelize(List(("")))
data.foreach(document => {
counts.collect
})
}
此代碼也永遠不會終止。看起來不能在foreach
函數中多次調用collect?
更新2:
我不知道爲什麼,但在運行之前轉換回到一切的司機會導致終止:
data.collect.foreach(document => {
counts.collect
})