2015-06-19 104 views
0

在第二次迭代的任務出現在線路掛起:代碼不Scala的星火在第二次迭代結束

val wordCountWithLabelsCollect = wordCountWithLabels.collect 

這是斯卡拉輸出:

5/06/19 15:49:33 INFO DAGScheduler: Submitting Stage 1 (MappedValuesRDD[3] at mapValues at Ques.scala:33), which has no missing parents 
15/06/19 15:49:33 INFO MemoryStore: ensureFreeSpace(2480) called with curMem=2219, maxMem=1030823608 
15/06/19 15:49:33 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.4 KB, free 983.1 MB) 
15/06/19 15:49:33 INFO MemoryStore: ensureFreeSpace(1812) called with curMem=4699, maxMem=1030823608 
15/06/19 15:49:33 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1812.0 B, free 983.1 MB) 
15/06/19 15:49:33 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:54590 (size: 1812.0 B, free: 983.1 MB) 
15/06/19 15:49:33 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 
15/06/19 15:49:33 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838 
15/06/19 15:49:33 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MappedValuesRDD[3] at mapValues at Ques.scala:33) 
15/06/19 15:49:33 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 

Scala代碼:

import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 
import org.apache.spark.SparkContext._ 
import org.apache.log4j.Logger 
import org.apache.log4j.Level 

object Ques extends App { 

    val data = getUncategorisedData 

    data.foreach(document => { 

    runner 

    }) 

    case class Document(label: String, text: String) 

    def reduceList(list: List[(String, Int)]) = list.groupBy(_._1).mapValues(_.aggregate(0)(_ + _._2, _ + _)) 

    def runner = { 
    val trainingData = getSC.parallelize(

     List(
     Document("sport", "this is text for document spor a"), 
     Document("sport", "this spor is text for document spor b"), 
     Document("news", "this is such a new categorise data"))) 

    val counts: org.apache.spark.rdd.RDD[(String, List[(String, Int)])] = trainingData.map(doc => ((doc.label, doc.text.split(" ").toList.map(w => (w, 1))))) 

    val mergedList = counts.mapValues((list: List[(String, Int)]) => reduceList(list).toList) 
    val wordCountWithLabels: org.apache.spark.rdd.RDD[(String, List[(String, Int)])] = mergedList.reduceByKey((accum: List[(String, Int)], value: List[(String, Int)]) => 
     { 
     val valueMap = value.toMap 
     val accumMap = accum.toMap 
     val mergedMap = accumMap ++ valueMap.map { case (k, v) => k -> (v + accumMap.getOrElse(k, 0)) } 
     mergedMap.toList 
     }) 

    val wordCountWithLabelsCollect = wordCountWithLabels.collect 
    wordCountWithLabels.collect 
    } 

    def getUncategorisedData: RDD[Document] = { 
    lazy val trainingData = getSC.parallelize(

     List(
     Document("", "this is text for document a"), 
     Document("", "this is text for document b"), 
     Document("", "this is text for for document c"))) 

    trainingData 

    } 

    lazy val getSC = { 

    val conf = new org.apache.spark.SparkConf() 
     .setMaster("local") 
     .setAppName("process") 
     .setSparkHome("C:\\spark-1.1.0-bin-hadoop2.4\\spddark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4") 
     .set("spark.executor.memory", "3g") 
     .set("deploy-mode", "standalone") 
     .set("SPARK_CONF_DIR", "c:\\data\\sparkConfig") 

    val sc = new SparkContext(conf) 

    sc 
    } 
} 

這裏有什麼問題?

在同一集合上多次調用收集應該不是問題?

如果我調用亞軍陸續:

runner 
runner 

然後終止。

更新:的相同的行爲

簡單的例子:

import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 
import org.apache.spark.SparkContext._ 
import org.apache.log4j.Logger 
import org.apache.log4j.Level 
import org.apache.spark.rdd.PairRDDFunctions 

object Ques extends App { 

    val conf = new org.apache.spark.SparkConf() 
    .setMaster("local") 
    .setAppName("process") 
    .setSparkHome("C:\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4") 
    .set("spark.executor.memory", "3g") 

    val sc = new SparkContext(conf) 

    val data = sc.parallelize(List("")) 

    val counts: org.apache.spark.rdd.RDD[(String)] = sc.parallelize(List((""))) 

    data.foreach(document => { 
    counts.collect 
    }) 

} 

此代碼也永遠不會終止。看起來不能在foreach函數中多次調用collect?

更新2:

我不知道爲什麼,但在運行之前轉換回到一切的司機會導致終止:

data.collect.foreach(document => { 
    counts.collect 
    }) 

回答

1

我想在這裏簡單的答案是,你正在嘗試從一個轉換中調用一個動作。操作將值返回給驅動程序;從轉換中調用一個動作沒有意義,因爲轉換是由工作人員執行的,而不是由驅動程序執行的。我找不到任何文檔比this section of the programming guide更明確的地方。

相關問題