2015-09-18 58 views
1

運行Windows 8.1,Java的1.8,斯卡拉2.10.5,星火1.4.1,斯卡拉IDE(Eclipse的4.4),IPython的3.0.0和Jupyter Scala故障排除斯卡拉星火配置/環境

我是比較新的Scala和Spark和我看到這裏一定RDD命令,如收集和先返回「任務不序列化」錯誤的問題。什麼是不尋常的對我來說是我看到的錯誤在IPython的筆記本電腦與斯卡拉內核或斯卡拉IDE。但是,當我直接在spark-shell中運行代碼時,我不會收到此錯誤。

我想設置爲超過殼更先進的編碼評價這兩種環境。我幾乎沒有什麼專業知識來解決這類問題並確定要尋找什麼;如果您可以提供有關如何開始解決此類問題的指導,我們將不勝感激。

代碼:

val logFile = "s3n://[key:[key secret]@mortar-example-data/airline-data" 
val sample = sc.parallelize(sc.textFile(logFile).take(100).map(line => line.replace("'","").replace("\"","")).map(line => line.substring(0,line.length()-1))) 
val header = sample.first 
val data = sample.filter(_!= header) 
data.take(1) 
data.count 
data.collect 

堆棧跟蹤

org.apache.spark.SparkException: Task not serializable 
    org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) 
    org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) 
    org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) 
    org.apache.spark.SparkContext.clean(SparkContext.scala:1893) 
    org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311) 
    org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310) 
    org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
    org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) 
    org.apache.spark.rdd.RDD.withScope(RDD.scala:286) 
    org.apache.spark.rdd.RDD.filter(RDD.scala:310) 
    cmd49$$user$$anonfun$4.apply(Main.scala:188) 
    cmd49$$user$$anonfun$4.apply(Main.scala:187) 
java.io.NotSerializableException: org.apache.spark.SparkConf 
Serialization stack: 
    - object not serializable (class: org.apache.spark.SparkConf, value: [email protected]) 
    - field (class: cmd12$$user, name: conf, type: class org.apache.spark.SparkConf) 
    - object (class cmd12$$user, [email protected]) 
    - field (class: cmd49, name: $ref$cmd12, type: class cmd12$$user) 
    - object (class cmd49, [email protected]) 
    - field (class: cmd49$$user, name: $outer, type: class cmd49) 
    - object (class cmd49$$user, [email protected]) 
    - field (class: cmd49$$user$$anonfun$4, name: $outer, type: class cmd49$$user) 
    - object (class cmd49$$user$$anonfun$4, <function0>) 
    - field (class: cmd49$$user$$anonfun$4$$anonfun$apply$3, name: $outer, type: class cmd49$$user$$anonfun$4) 
    - object (class cmd49$$user$$anonfun$4$$anonfun$apply$3, <function1>) 
    org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
    org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) 
    org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) 
    org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) 
    org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) 
    org.apache.spark.SparkContext.clean(SparkContext.scala:1893) 
    org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311) 
    org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310) 
    org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
    org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) 
    org.apache.spark.rdd.RDD.withScope(RDD.scala:286) 
    org.apache.spark.rdd.RDD.filter(RDD.scala:310) 
    cmd49$$user$$anonfun$4.apply(Main.scala:188) 
    cmd49$$user$$anonfun$4.apply(Main.scala:187) 
+1

你爲什麼要使用sc.parallelize內sc.textFile?!? – eliasah

回答

0

sc.textFile已經創建分佈式數據集(查看文檔)。你不需要sc.parallelize在這種情況下,但是 - 作爲eliasah正確指出的 - 你需要再次打開結果爲RDD,如果你想有一個RDD。

val selection = sc.textFile(logFile). // RDD 
take(100). // collection 
map(_.replaceAll("['\"]",""). // use regex to match both chars 
map(_.init) // a method that returns all elements except the last 
// turn the resulting collection into RDD again 
val sample = sc.parallelize(selection) 
+0

樣本不是RDD在這裏想到的! – eliasah

+0

爲什麼?textFile應該創建一個RDD不是嗎? – Ashalynd

+0

是的,然後取(100)返回你正在映射的數組[字符串]。所以,你只需要一個你已經應用了replaceAll和init函數的字符串數組。 – eliasah

1

@Ashalynd對於sc.textFile已經創建和RDD的事實是正確的。在這種情況下,你不需要sc.parallelize。 documentation here

因此,考慮你的榜樣,這就是你需要做的:

// Read your data from S3 
val logFile = "s3n://[key:[key secret]@mortar-example-data/airline-data" 
val rawRDD = sc.textFile(logFile) 

// Fetch the header 
val header = rawRDD.first 

// Filter on the header than map to clean the line 
val sample = rawRDD.filter(!_.contains(header)).map { 
line => line.replaceAll("['\"]","").substring(0,line.length()-1) 
}.takeSample(false,100,12L) // takeSample returns a fixed-size sampled subset of this RDD in an array 

這是更好地使用takeSample功能:

高清takeSample(withReplacement:布爾,民:中等,種子:長= Utils.random.nextLong):數組[T]

withReplacement:返回的樣品的大小

種子:採樣是否與更換

NUM完成種子的隨機數發生器

注1:所述樣品是數組[String],因此如果您想將其轉換爲RDD,則可以使用parallelize函數,如下所示:

val sampleRDD = sc.parallelize(sample.toSeq) 

注2:如果你想直接從您的rawRDD.filter(...).map(...)採取抽樣RDD,您可以使用sample函數返回一個RDD [T]。不過,您需要指定所需數據的一部分而不是特定的數字。

+0

'val header = sample.first'雖然不會工作,因爲'樣本'在那個點上還不存在? – Ashalynd

+0

我的不好。這應該是rawRDD.first – eliasah

+0

,但您的標題然後不同於從樣本製作的標題。樣本清除數據,將每個數據元素的長度減1,然後提取頭並根據該頭過濾數據。取不等於未處理的第一個元素的原始數據可能不會產生相同的集合。 – Ashalynd