2015-04-23 48 views
2

我有一個文件,其中每行包含(Stringx, Stringy)(字符串,字符串)Scala中的Apache星火

我想找到的Stringy的出現在整個數據集。 是我迄今所管理的代碼如下:

val file = sc.textFile("s3n://bucket/test.txt") // RDD[ String ] 
val splitRdd = file.map(line => line.split("\t"))  
    // RDD[ Array[ String ] 
val yourRdd = splitRdd.flatMap(arr => { 
     val title = arr(0) 
     val text = arr(1) 
     val words = text.split(" ") 
     words.map(word => (word, title)) 
    }) 
    // RDD[ (String, String) ] 

scala> val s = yourRdd.map(word => ((word, scala.math.log(N/(file.filter(_.split("\t")(1).contains(word.split(",")(1))).count))))) 
<console>:31: error: value split is not a member of (String, String) 
     val s = yourRdd.map(word => ((word, scala.math.log(N/(file.filter(_.split("\t")(1).contains(word.split(",")(1))).count))))) 

這裏N = 20(這是一個固定值) 我應該如何解決這個問題?

UPDATE

實施布蘭頓的評論

scala> val s = yourRdd.map(word => (word, scala.math.log(N/file.filter(_.split("\t")(1).contains(word._1.split(",")(1))).count))) 
s: org.apache.spark.rdd.RDD[((String, String), Double)] = MapPartitionsRDD[18] at map at <console>:33 

scala> s.first() 
15/04/23 15:43:44 INFO SparkContext: Starting job: first at <console>:36 
15/04/23 15:43:44 INFO DAGScheduler: Got job 16 (first at <console>:36) with 1 output partitions (allowLocal=true) 
15/04/23 15:43:44 INFO DAGScheduler: Final stage: Stage 17(first at <console>:36) 
15/04/23 15:43:44 INFO DAGScheduler: Parents of final stage: List() 
15/04/23 15:43:44 INFO DAGScheduler: Missing parents: List() 
15/04/23 15:43:44 INFO DAGScheduler: Submitting Stage 17 (MapPartitionsRDD[18] at map at <console>:33), which has no missing parents 
15/04/23 15:43:44 INFO MemoryStore: ensureFreeSpace(11480) called with curMem=234927, maxMem=277842493 
15/04/23 15:43:44 INFO MemoryStore: Block broadcast_18 stored as values in memory (estimated size 11.2 KB, free 264.7 MB) 
15/04/23 15:43:44 INFO MemoryStore: ensureFreeSpace(5713) called with curMem=246407, maxMem=277842493 
15/04/23 15:43:44 INFO MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 5.6 KB, free 264.7 MB) 
15/04/23 15:43:44 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on localhost:59043 (size: 5.6 KB, free: 264.9 MB) 
15/04/23 15:43:44 INFO BlockManagerMaster: Updated info of block broadcast_18_piece0 
15/04/23 15:43:44 INFO SparkContext: Created broadcast 18 from broadcast at DAGScheduler.scala:839 
15/04/23 15:43:44 INFO DAGScheduler: Submitting 1 missing tasks from Stage 17 (MapPartitionsRDD[18] at map at <console>:33) 
15/04/23 15:43:44 INFO TaskSchedulerImpl: Adding task set 17.0 with 1 tasks 
15/04/23 15:43:44 INFO TaskSetManager: Starting task 0.0 in stage 17.0 (TID 22, localhost, PROCESS_LOCAL, 1301 bytes) 
15/04/23 15:43:44 INFO Executor: Running task 0.0 in stage 17.0 (TID 22) 
15/04/23 15:43:44 INFO HadoopRDD: Input split: file:/home/ec2-user/input/OUTPUT/temp:0+128629 
15/04/23 15:43:44 ERROR Executor: Exception in task 0.0 in stage 17.0 (TID 22) 
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. 
+0

直接的問題在於'yourRdd'的類型是'(String,String)',因此在'yourRDD.map(word => ...)'中,字的類型是'(String,String)。當您嘗試映射RDD文件並且不支持該方法時,該方法不起作用。你想嘗試什麼?這看起來像TF-IDF,對吧? – maasg

+0

@maasg是的!你是對的!但是我想不使用MLlib。 – AngryPanda

+0

@maasg:太棒了。你是如何得出這個結論的? – AngryPanda

回答

0

它是尋找價值, '拆分',在字(這將是一個 '高清分裂' 成員)。 '字',但是,不是一個字符串,它是一個(字符串,字符串)和元組沒有一個拆分方法。我相信你的意思做word._1.split(",")(0),命令變爲:

val s = yourRdd.map(word => (word, scala.math.log(N/file.filter(_.split("\t")(1).contains(word._1.split(",")(1))).count))) 

編輯::

與maasg的答案,真正的根本問題提供了清晰,我看到,我需要計算的唯一實例每個標題一個字。我想贊成票maasg的答案,但我沒有足夠的代表尚未:(

val sc = SparkApplicationContext.coreCtx 
val N = 20 
var rdd: RDD[String] = sc.parallelize(Seq("t1\thi how how you,you", "t1\tcat dog,cat,mouse how you,you")) 
val splitRdd: RDD[Array[String]] = rdd.map(line => line.split("\t")) 

//Uniqe words per title and then reduced by title into a count 
val wordCountRdd = splitRdd.flatMap(arr => 
    arr(1).split(" |,").distinct //Including a comma cause you seem to split on this later on to, but I don't think you actually need too 
    .map(word => (word, 1)) 
).reduceByKey{case (cumm, one) => cumm + one} 

val s: RDD[(String, Double)] = wordCountRdd.map{ case (word, freq) => (word, scala.math.log(N/freq)) } 
s.collect().map(x => println(x._1 + ", " + x._2)) 
+0

使用word(0)對它進行參數化。但是String,String不帶參數。 – AngryPanda

+0

你是對的,現在編輯。 – Brendan

+0

這不起作用。文件是一個RDD,RDD不能在閉包中使用,因爲沒有爲RDD定義序列化。 – maasg

0

正如在評論中提到的,它不可能在封閉的另一RDD操作中使用的RDD「嵌套」。這要求改變策略 假設每個標題都是唯一的,並且試圖與原始問題在相同的行中工作,這可以成爲替代以消除對嵌套RDD計算的需要:

val file = sc.textFile("s3n://bucket/test.txt") // RDD[ String ] 
val wordByTitle = file.flatMap{line => 
    val split = line.split("\t") 
    val title = split(0) 
    val words = split(1).split(" ") 
    words.map(w=> (w,title)) 
} 

// we want the count of documents in which a word appears, 
// this is equivalent to counting distinct (word, title) combinations. 
// note that replacing the title by a hash would save considerable memory 
val uniqueWordPerTitle = wordByTitle.distinct() 

// now we can calculate the word frequency acros documents 
val tdf = uniqueWordPerTitle.map{case (w, title) => (w,1)}.reduceByKey(_ + _) 

// and the inverse document frequency per word. 
val idf = tdf.map{case (word,freq) => (word, scala.math.log(N/freq))}