我有一個文件,其中每行包含(Stringx, Stringy)
(字符串,字符串)Scala中的Apache星火
我想找到的Stringy
的出現在整個數據集。 是我迄今所管理的代碼如下:
val file = sc.textFile("s3n://bucket/test.txt") // RDD[ String ]
val splitRdd = file.map(line => line.split("\t"))
// RDD[ Array[ String ]
val yourRdd = splitRdd.flatMap(arr => {
val title = arr(0)
val text = arr(1)
val words = text.split(" ")
words.map(word => (word, title))
})
// RDD[ (String, String) ]
scala> val s = yourRdd.map(word => ((word, scala.math.log(N/(file.filter(_.split("\t")(1).contains(word.split(",")(1))).count)))))
<console>:31: error: value split is not a member of (String, String)
val s = yourRdd.map(word => ((word, scala.math.log(N/(file.filter(_.split("\t")(1).contains(word.split(",")(1))).count)))))
這裏N = 20
(這是一個固定值) 我應該如何解決這個問題?
UPDATE
實施布蘭頓的評論
scala> val s = yourRdd.map(word => (word, scala.math.log(N/file.filter(_.split("\t")(1).contains(word._1.split(",")(1))).count)))
s: org.apache.spark.rdd.RDD[((String, String), Double)] = MapPartitionsRDD[18] at map at <console>:33
scala> s.first()
15/04/23 15:43:44 INFO SparkContext: Starting job: first at <console>:36
15/04/23 15:43:44 INFO DAGScheduler: Got job 16 (first at <console>:36) with 1 output partitions (allowLocal=true)
15/04/23 15:43:44 INFO DAGScheduler: Final stage: Stage 17(first at <console>:36)
15/04/23 15:43:44 INFO DAGScheduler: Parents of final stage: List()
15/04/23 15:43:44 INFO DAGScheduler: Missing parents: List()
15/04/23 15:43:44 INFO DAGScheduler: Submitting Stage 17 (MapPartitionsRDD[18] at map at <console>:33), which has no missing parents
15/04/23 15:43:44 INFO MemoryStore: ensureFreeSpace(11480) called with curMem=234927, maxMem=277842493
15/04/23 15:43:44 INFO MemoryStore: Block broadcast_18 stored as values in memory (estimated size 11.2 KB, free 264.7 MB)
15/04/23 15:43:44 INFO MemoryStore: ensureFreeSpace(5713) called with curMem=246407, maxMem=277842493
15/04/23 15:43:44 INFO MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 5.6 KB, free 264.7 MB)
15/04/23 15:43:44 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on localhost:59043 (size: 5.6 KB, free: 264.9 MB)
15/04/23 15:43:44 INFO BlockManagerMaster: Updated info of block broadcast_18_piece0
15/04/23 15:43:44 INFO SparkContext: Created broadcast 18 from broadcast at DAGScheduler.scala:839
15/04/23 15:43:44 INFO DAGScheduler: Submitting 1 missing tasks from Stage 17 (MapPartitionsRDD[18] at map at <console>:33)
15/04/23 15:43:44 INFO TaskSchedulerImpl: Adding task set 17.0 with 1 tasks
15/04/23 15:43:44 INFO TaskSetManager: Starting task 0.0 in stage 17.0 (TID 22, localhost, PROCESS_LOCAL, 1301 bytes)
15/04/23 15:43:44 INFO Executor: Running task 0.0 in stage 17.0 (TID 22)
15/04/23 15:43:44 INFO HadoopRDD: Input split: file:/home/ec2-user/input/OUTPUT/temp:0+128629
15/04/23 15:43:44 ERROR Executor: Exception in task 0.0 in stage 17.0 (TID 22)
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
直接的問題在於'yourRdd'的類型是'(String,String)',因此在'yourRDD.map(word => ...)'中,字的類型是'(String,String)。當您嘗試映射RDD文件並且不支持該方法時,該方法不起作用。你想嘗試什麼?這看起來像TF-IDF,對吧? – maasg
@maasg是的!你是對的!但是我想不使用MLlib。 – AngryPanda
@maasg:太棒了。你是如何得出這個結論的? – AngryPanda