2017-09-02 67 views
0

我花了數小時瀏覽You Tube vids和教程,試圖瞭解如何在Scala中運行Spark的字數統計程序,並將其轉化爲jar文件。我現在變得非常困惑。在IntelliJ中運行Spark字數

我的Hello World運行,並且我已經瞭解要去圖書館在Apache.spark.spark核加,但現在我越來越

Error: Could not find or load main class WordCount 

進一步我徹底困惑爲什麼這兩個教程,我認爲我們在教同一件事似乎有很大的不同:tutorial1 tutorial2

第二個似乎是第一個的兩倍,它引發的東西,第一個沒有提到。我是否應該依靠這兩種方法來幫助我獲得一個簡單的字數統計程序並啓動並運行?

Ps。我的代碼目前看起來像這樣。我從某處複製它:

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._ 

object WordCount { 
    def main(args: Array[String]) { 

    val sc = new SparkContext("local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
    val input = sc.textFile("../Data/input.txt") 
    val count = input.flatMap(line ⇒ line.split(" ")) 
     .map(word ⇒ (word, 1)) 
     .reduceByKey(_ + _) 
    count.saveAsTextFile("outfile") 
    System.out.println("OK"); 
    } 
} 
+0

你的第一個鏈接是PDF從您的計算機的......我們不能訪問 –

+0

@ cricket_007它在這裏: 「用的IntelliJ社區edition.pd設置火花2.0 f「 https://www.ibm.com/developerworks/community/files/app#/file/b41505ac-141b-45a2-84cd-1b6a8d5ae653 –

回答

2

展的IntelliJ IDEA做文件的代碼。

在build.sbt寫

scalaVersion := "2.11.11" 
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0" 

不要在命令行sbt update(從您的主項目文件夾中)的IntelliJ IDEA內或按刷新按鈕在SBT工具窗口)。

編寫代碼在src/main/scala/WordCount.scala

import org.apache.spark.{SparkConf, SparkContext} 

object WordCount { 
    def main(args: Array[String]) { 
    val conf = new SparkConf() 
     .setMaster("local") 
     .setAppName("Word Count") 
     .setSparkHome("src/main/resources") 
    val sc = new SparkContext(conf) 
    val input = sc.textFile("src/main/resources/input.txt") 
    val count = input.flatMap(line ⇒ line.split(" ")) 
     .map(word ⇒ (word, 1)) 
     .reduceByKey(_ + _) 
    count.saveAsTextFile("src/main/resources/outfile") 
    println("OK") 
    } 
} 

把你的文件src/main/resources/input.txt

運行代碼:按Ctrl + Shift + F10或sbt run

在文件夾src/main/resources應該出現新的子文件夾outfile與幾個文件。

控制檯輸出:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
17/09/02 14:57:08 INFO SparkContext: Running Spark version 2.2.0 
17/09/02 14:57:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
17/09/02 14:57:09 WARN Utils: Your hostname, dmitin-HP-Pavilion-Notebook resolves to a loopback address: 127.0.1.1; using 192.168.1.104 instead (on interface wlan0) 
17/09/02 14:57:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 
17/09/02 14:57:09 INFO SparkContext: Submitted application: Word Count 
17/09/02 14:57:09 INFO SecurityManager: Changing view acls to: dmitin 
17/09/02 14:57:09 INFO SecurityManager: Changing modify acls to: dmitin 
17/09/02 14:57:09 INFO SecurityManager: Changing view acls groups to: 
17/09/02 14:57:09 INFO SecurityManager: Changing modify acls groups to: 
17/09/02 14:57:09 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(dmitin); groups with view permissions: Set(); users with modify permissions: Set(dmitin); groups with modify permissions: Set() 
17/09/02 14:57:10 INFO Utils: Successfully started service 'sparkDriver' on port 38186. 
17/09/02 14:57:10 INFO SparkEnv: Registering MapOutputTracker 
17/09/02 14:57:10 INFO SparkEnv: Registering BlockManagerMaster 
17/09/02 14:57:10 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 
17/09/02 14:57:10 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 
17/09/02 14:57:10 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-d90a4735-6a2b-42b2-85ea-55b0ed9b1dfd 
17/09/02 14:57:10 INFO MemoryStore: MemoryStore started with capacity 1950.3 MB 
17/09/02 14:57:10 INFO SparkEnv: Registering OutputCommitCoordinator 
17/09/02 14:57:10 INFO Utils: Successfully started service 'SparkUI' on port 4040. 
17/09/02 14:57:11 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.104:4040 
17/09/02 14:57:11 INFO Executor: Starting executor ID driver on host localhost 
17/09/02 14:57:11 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 46432. 
17/09/02 14:57:11 INFO NettyBlockTransferService: Server created on 192.168.1.104:46432 
17/09/02 14:57:11 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 
17/09/02 14:57:11 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.104, 46432, None) 
17/09/02 14:57:11 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.104:46432 with 1950.3 MB RAM, BlockManagerId(driver, 192.168.1.104, 46432, None) 
17/09/02 14:57:11 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.104, 46432, None) 
17/09/02 14:57:11 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.104, 46432, None) 
17/09/02 14:57:12 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 214.5 KB, free 1950.1 MB) 
17/09/02 14:57:12 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.4 KB, free 1950.1 MB) 
17/09/02 14:57:12 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.104:46432 (size: 20.4 KB, free: 1950.3 MB) 
17/09/02 14:57:12 INFO SparkContext: Created broadcast 0 from textFile at WordCount.scala:16 
17/09/02 14:57:12 INFO FileInputFormat: Total input paths to process : 1 
17/09/02 14:57:12 INFO SparkContext: Starting job: saveAsTextFile at WordCount.scala:20 
17/09/02 14:57:12 INFO DAGScheduler: Registering RDD 3 (map at WordCount.scala:18) 
17/09/02 14:57:12 INFO DAGScheduler: Got job 0 (saveAsTextFile at WordCount.scala:20) with 1 output partitions 
17/09/02 14:57:12 INFO DAGScheduler: Final stage: ResultStage 1 (saveAsTextFile at WordCount.scala:20) 
17/09/02 14:57:12 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0) 
17/09/02 14:57:12 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0) 
17/09/02 14:57:12 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCount.scala:18), which has no missing parents 
17/09/02 14:57:13 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.7 KB, free 1950.1 MB) 
17/09/02 14:57:13 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.7 KB, free 1950.1 MB) 
17/09/02 14:57:13 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.104:46432 (size: 2.7 KB, free: 1950.3 MB) 
17/09/02 14:57:13 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006 
17/09/02 14:57:13 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCount.scala:18) (first 15 tasks are for partitions Vector(0)) 
17/09/02 14:57:13 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 
17/09/02 14:57:13 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 4873 bytes) 
17/09/02 14:57:13 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 
17/09/02 14:57:13 INFO HadoopRDD: Input split: file:/home/dmitin/Projects/sparkdemo/src/main/resources/input.txt:0+11 
17/09/02 14:57:13 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1154 bytes result sent to driver 
17/09/02 14:57:13 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 289 ms on localhost (executor driver) (1/1) 
17/09/02 14:57:13 INFO DAGScheduler: ShuffleMapStage 0 (map at WordCount.scala:18) finished in 0,321 s 
17/09/02 14:57:13 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
17/09/02 14:57:13 INFO DAGScheduler: looking for newly runnable stages 
17/09/02 14:57:13 INFO DAGScheduler: running: Set() 
17/09/02 14:57:13 INFO DAGScheduler: waiting: Set(ResultStage 1) 
17/09/02 14:57:13 INFO DAGScheduler: failed: Set() 
17/09/02 14:57:13 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at WordCount.scala:20), which has no missing parents 
17/09/02 14:57:13 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 65.3 KB, free 1950.0 MB) 
17/09/02 14:57:13 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 23.3 KB, free 1950.0 MB) 
17/09/02 14:57:13 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.104:46432 (size: 23.3 KB, free: 1950.3 MB) 
17/09/02 14:57:13 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006 
17/09/02 14:57:13 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at WordCount.scala:20) (first 15 tasks are for partitions Vector(0)) 
17/09/02 14:57:13 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 
17/09/02 14:57:13 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, ANY, 4621 bytes) 
17/09/02 14:57:13 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 
17/09/02 14:57:13 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 
17/09/02 14:57:13 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 10 ms 
17/09/02 14:57:13 INFO FileOutputCommitter: Saved output of task 'attempt_20170902145712_0001_m_000000_1' to file:/home/dmitin/Projects/sparkdemo/src/main/resources/outfile/_temporary/0/task_20170902145712_0001_m_000000 
17/09/02 14:57:13 INFO SparkHadoopMapRedUtil: attempt_20170902145712_0001_m_000000_1: Committed 
17/09/02 14:57:13 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1224 bytes result sent to driver 
17/09/02 14:57:13 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 221 ms on localhost (executor driver) (1/1) 
17/09/02 14:57:13 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
17/09/02 14:57:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at WordCount.scala:20) finished in 0,223 s 
17/09/02 14:57:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at WordCount.scala:20, took 1,222133 s 
OK 
17/09/02 14:57:13 INFO SparkContext: Invoking stop() from shutdown hook 
17/09/02 14:57:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.104:4040 
17/09/02 14:57:13 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
17/09/02 14:57:13 INFO MemoryStore: MemoryStore cleared 
17/09/02 14:57:13 INFO BlockManager: BlockManager stopped 
17/09/02 14:57:13 INFO BlockManagerMaster: BlockManagerMaster stopped 
17/09/02 14:57:13 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
17/09/02 14:57:13 INFO SparkContext: Successfully stopped SparkContext 
17/09/02 14:57:13 INFO ShutdownHookManager: Shutdown hook called 
17/09/02 14:57:13 INFO ShutdownHookManager: Deleting directory /tmp/spark-663047b2-415a-45b5-bcad-20bd18270baa 

Process finished with exit code 0 
+0

謝謝 - 這很好。雖然「錯誤:(1,12)對象apache不是包名org import org.apache.spark。{SparkConf,SparkContext}」,但我正在調查中。 – user1761806

+0

@ user1761806看起來像解決依賴問題。 嘗試'sbt clean'然後'sbt update'。 或嘗試在IntelliJ中重新導入項目。 –

+0

其實我解決了它。 sbt更新,應該在您的主項目文件夾中完成,而不僅僅是從任何位置進行。 – user1761806

0

你總是可以做WordCount擴展應用程序,這應該工作。我相信它是關於你組織項目的方式。

閱讀更多關於應用程序的特點。

http://www.scala-lang.org/api/2.12.1/scala/App.html

在任何情況下,請確保您的目錄結構看起來是這樣的。

./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/WordCount.scala

+0

是的,我看到了提到的擴展應用程序方法,但假設更多的Java風格大多數開發者使用哪種方式? (Ps。我是Scala的新手,自從我在uni時代開始就沒有做過Java) – user1761806

+0

哦,不,這是完全沒問題的...... –

+0

我相信,如果你有我提到的目錄佈局,你甚至不需要擴展應用:) –

0

確認用檢>新建 - - >項目 - >斯卡拉 - > SBT - >(選擇項目的位置和名稱) - >完成我下面寫

package com.spark.app 

import org.scalatra._ 
import org.apache.spark.{ SparkContext, SparkConf } 

class MySparkAppServlet extends MySparkAppStack { 

    get("/wc") { 
     val inputFile = "/home/limitless/Documents/projects/test/my-spark-app/README.md" 
     val outputFile = "/home/limitless/Documents/projects/test/my-spark-app/README.txt" 
     val conf = new SparkConf().setAppName("wordCount").setMaster("local[*]") 
     val sc = new SparkContext(conf) 
     val input = sc.textFile(inputFile) 
     val words = input.flatMap(line => line.split(" ")) 
     val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y} 
     counts.saveAsTextFile(outputFile) 
    } 

}